更新时间:2022-02-22 GMT+08:00
配置时间模型
Flink中主要提供两种时间模型:Processing Time和Event Time。
DLI允许在创建Source Stream和Temp Stream的时候指定时间模型以便在后续计算中使用。
配置Processing Time
Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。
语法格式
1 2 3 4 |
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY proctime.proctime;
CREATE TEMP STREAM stream_name(...)
TIMESTAMP BY proctime.proctime;
|
语法说明
设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。
注意事项
无。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE SOURCE STREAM student_scores (
student_number STRING, /* 学号 */
student_name STRING, /* 姓名 */
subject STRING, /* 学科 */
score INT /* 成绩 */
)
WITH (
type = "dis",
region = "",
channel = "dliinput",
partition_count = "1",
encode = "csv",
field_delimiter=","
)TIMESTAMP BY proctime.proctime;
INSERT INTO score_greate_90
SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING)
FROM student_scores;
|
配置Event Time
Event Time是指事件产生的时间,即数据产生时自带时间戳。
语法格式
1 2 3 |
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY {attr_name}.rowtime
SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval});
|
语法说明
设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。
由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。
Watermark有两种设置策略:
- 按时间周期
1
SET WATERMARK(range interval {time_unit}, interval {time_unit})
- 按事件个数
1
SET WATERMARK(rows literal, interval {time_unit})
一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。
注意事项
无。
示例
- 每10s发送一次watermark,事件最大允许延迟时间为20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
- 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
父主题: 流作业SQL语法