FlinkSQL OVER窗口支持超期退窗
本章节适用于MRS 3.5.0及以后版本。
FlinkSQL OVER窗口新增数据超期退窗功能,当已有数据过期且没有新数据到来时,OVER聚合结果刷新并向下游算子发送最新的计算结果,可通过over.window.interval配置该功能,配置如下:
参数名称 |
默认值 |
说明 |
---|---|---|
over.window.interval |
-1 |
相邻2条数据的时间间隔,超过该时间间隔,OVER注册超期触发器,单位毫秒。
|
over.window.interval.last.rowdata |
false |
是否只向下游算子发送最后一条数据。
|
使用方法
配置Flink作业时,可通过在FlinkServer Web UI的作业开发界面添加自定义参数“over.window.interval”,且值配置为大于或等于“0”时开启窗口支持数据超期功能,创建作业可参考如何创建FlinkServer作业。该设置会对作业中的所有over窗口生效,建议对单over窗口的作业使用此功能。
- SQL示例:
CREATE TABLE OverSource ( `rowtime` TIMESTAMP_LTZ(3), `groupId` INT, `value` INT, `name` STRING, `additional_field` STRING, `proctime` as PROCTIME(), WATERMARK FOR rowtime AS rowtime ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数 ); CREATE TABLE LD_SINK( `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT ) WITH ( 'connector' = 'print' ); SELECT `name`, `groupId`, COUNT(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as count_zw, SUM(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as sum_zw FROM OverSource