更新时间:2024-12-11 GMT+08:00

FlinkSQL OVER窗口支持超期退窗

本章节适用于MRS 3.5.0及以后版本。

FlinkSQL OVER窗口新增数据超期退窗功能,当已有数据过期且没有新数据到来时,OVER聚合结果刷新并向下游算子发送最新的计算结果,可通过over.window.interval配置该功能,配置如下:

表1 FlinkSQL OVER窗口数据超期退窗功能

参数名称

默认值

说明

over.window.interval

-1

相邻2条数据的时间间隔,超过该时间间隔,OVER注册超期触发器,单位毫秒。

  • -1:不开启该功能。
  • 0:每条数据都会注册超期触发器,但会加大对作业的压力,根据作业的具体情况可以适当调大over.window.interval间隔。
  • 正数:相邻2条数据的时间间隔,超过该时间间隔,OVER注册超期触发器。

over.window.interval.last.rowdata

false

是否只向下游算子发送最后一条数据。

  • false(默认值):发送所有未过期的数据。
  • true:只向下游算子发送最后一条数据。

使用方法

配置Flink作业时,可通过在FlinkServer Web UI的作业开发界面添加自定义参数“over.window.interval”,且值配置为大于或等于“0”时开启窗口支持数据超期功能,创建作业可参考如何创建FlinkServer作业。该设置会对作业中的所有over窗口生效,建议对单over窗口的作业使用此功能。

  • SQL示例:
    CREATE TABLE OverSource (
      `rowtime` TIMESTAMP_LTZ(3),
      `groupId` INT,
      `value` INT,
      `name` STRING,
      `additional_field` STRING,
      `proctime` as PROCTIME(),
      WATERMARK FOR rowtime AS rowtime
    ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_source',
     'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',  
     'properties.group.id' = 'testGroup', 
     'scan.startup.mode' = 'latest-offset', 
     'format' = 'csv',  
     'properties.sasl.kerberos.service.name' = 'kafka',   --FlinkServer所在集群为非安全模式去掉此参数
     'properties.security.protocol' = 'SASL_PLAINTEXT',  --FlinkServer所在集群为非安全模式去掉此参数
     'properties.kerberos.domain.name' = 'hadoop.系统域名' --FlinkServer所在集群为非安全模式去掉此参数
    );
    
    CREATE TABLE LD_SINK(
     `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT
    ) WITH ( 
     'connector' = 'print'
    );
    
    SELECT
      `name`,
      `groupId`,
      COUNT(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as count_zw,
      SUM(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as sum_zw
    FROM
      OverSource
  • SQL输出结果:

    以proctime为“10:00:00”,groupid为“1”,value为“1~5”为例,连续五条数据进入Flink:

    ["zw",1,1,1]
    ["zw",1,2,3]
    ["zw",1,3,6]
    ["zw",1,4,10]
    ["zw",1,5,15]

    “10:00:10”后没有数据再进入,开始退窗:

    ["zw",1,4,14]
    ["zw",1,3,12]
    ["zw",1,2,9]
    ["zw",1,1,5]
    ["zw",1,0,0]