配置时间模型
Flink中主要提供两种时间模型:Processing Time和Event Time。
DLI允许在创建Source Stream和Temp Stream的时候指定时间模型以便在后续计算中使用。
配置Processing Time
Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。
语法格式
1 2 3 4 |
CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY proctime.proctime; CREATE TEMP STREAM stream_name(...) TIMESTAMP BY proctime.proctime; |
语法说明
设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。
注意事项
无。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT /* 成绩 */ ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," )TIMESTAMP BY proctime.proctime; INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING) FROM student_scores; |
配置Event Time
Event Time是指事件产生的时间,即数据产生时自带时间戳。
语法格式
1 2 3 |
CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY {attr_name}.rowtime SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval}); |
语法说明
设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。
由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。
Watermark有两种设置策略:
- 按时间周期
1
SET WATERMARK(range interval {time_unit}, interval {time_unit})
- 按事件个数
1
SET WATERMARK(rows literal, interval {time_unit})
一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。
注意事项
无。
示例
- time2事件产生时间开始,每10s发送一次watermark,事件最大允许延迟时间为20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
- 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;