更新时间:2022-02-22 GMT+08:00

配置时间模型

Flink中主要提供两种时间模型:Processing Time和Event Time。

DLI允许在创建Source Stream和Temp Stream的时候指定时间模型以便在后续计算中使用。

配置Processing Time

Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。

语法格式

1
2
3
4
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY proctime.proctime;
CREATE TEMP STREAM stream_name(...)
TIMESTAMP BY proctime.proctime;

语法说明

设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。

注意事项

无。

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CREATE SOURCE STREAM student_scores (
  student_number STRING, /* 学号 */
  student_name STRING, /* 姓名 */
  subject STRING, /* 学科 */
  score INT /* 成绩 */
)
WITH (
  type = "dis",
  region = "",
  channel = "dliinput",
  partition_count = "1",
  encode = "csv",
  field_delimiter=","
)TIMESTAMP BY proctime.proctime;

INSERT INTO score_greate_90
SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING) 
FROM student_scores;

配置Event Time

Event Time是指事件产生的时间,即数据产生时自带时间戳。

语法格式

1
2
3
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY {attr_name}.rowtime
SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval});

语法说明

设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。

由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。

Watermark有两种设置策略:

  • 按时间周期
    1
    SET WATERMARK(range interval {time_unit}, interval {time_unit})
    
  • 按事件个数
    1
    SET WATERMARK(rows literal, interval {time_unit})
    

一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。

注意事项

无。

示例

  • 每10s发送一次watermark,事件最大允许延迟时间为20s。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 TIMESTAMP
    )
    WITH (
      type = "dis",
      region = "",
      channel = "dliinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (RANGE interval 10 second, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;
    
  • 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 TIMESTAMP
    )
    WITH (
      type = "dis",
      region = "",
      channel = "dliinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (ROWS 10, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;