更新时间:2024-11-29 GMT+08:00

FlinkSQL窗口函数支持迟到数据

FlinkSQL新增窗口函数支持迟到数据特性,解决迟到数据需要处理的场景。目前支持TUMBLE、HOP、OVER、CUMULATE窗口函数的迟到数据,示例如下:

CREATE TABLE T1 (
 `int` INT,
 `double` DOUBLE,
 `float` FLOAT,
 `bigdec` DECIMAL(10, 2),
 `string` STRING,
 `name` STRING,
 `rowtime` TIMESTAMP(3),
 WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
) WITH ( 
 'connector' = 'values',
);

-- 该Sink的字段必须和窗口的输入数据保持一致,但顺序不要求一致
CREATE TABLE LD_SINK(
 `float` FLOAT, `string` STRING, `name` STRING,  `rowtime` TIMESTAMP(3)
) WITH ( 
 'connector' = 'print',
);

SELECT  /*+ LATE_DATA_SINK('sink.name'='LD_SINK') */
  `name`,
  MIN(`float`),
  COUNT(DISTINCT `string`)
FROM TABLE(
  TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
GROUP BY `name`, window_start, window_end

该特性还支持窗口接收到迟到数据时输出当前窗口的开始时间和结束时间,可通过添加在Hint中'window.start.field'和'window.end.field'使用,字段类型必须是timestamp,示例如下:

CREATE TABLE LD_SINK(
 `float` FLOAT, `string` STRING, `name` STRING,  `rowtime` TIMESTAMP(3), `windowStart` TIMESTAMP(3), `windowEnd` TIMESTAMP(3)
) WITH ( 
 'connector' = 'print',
);

SELECT  /*+ LATE_DATA_SINK('sink.name'='LD_SINK', 'window.start.field'='windowStart', 'window.end.field'='windowEnd') */
  `name`,
  MIN(`float`),
  COUNT(DISTINCT `string`)
FROM TABLE(
  TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
GROUP BY `name`, window_start, window_end