文档首页 > > SQL语法参考> 配置时间模型

配置时间模型

分享
更新时间: 2020/01/14 GMT+08:00

Flink中主要提供两种时间模型:Processing Time和Event Time。

CS允许在创建Source Stream和Temp Stream的时候指定时间模型以便在后续计算中使用。

配置Processing Time

Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。

语法格式

CREATE SOURCE STREAM stream_name(...) WITH (...)

TIMESTAMP BY proctime.proctime;

CREATE TEMP STREAM stream_name(...)

TIMESTAMP BY proctime.proctime;

语法说明

设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。

注意事项

无。

示例

CREATE SOURCE STREAM student_scores (
  student_number STRING, /* 学号 */
  student_name STRING, /* 姓名 */
  subject STRING, /* 学科 */
  score INT /* 成绩 */
)
WITH (
  type = "dis",
  region = "cn-north-1",
  channel = "csinput",
  partition_count = "1",
  encode = "csv",
  field_delimiter=","
)TIMESTAMP BY proctime.proctime;

INSERT INTO score_greate_90
SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING) 
FROM student_scores;

配置Event Time

Event Time是指事件产生的时间,即数据产生时自带时间戳。

语法格式

CREATE SOURCE STREAM stream_name(...) WITH (...)

TIMESTAMP BY {attr_name}.rowtime

SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval});

语法说明

设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。

由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到CS服务,中间有一个过程。

Watermark有两种设置策略:

  • 按时间周期

    SET WATERMARK(range interval {time_unit}, interval {time_unit})

  • 按事件个数

    SET WATERMARK(rows literal, interval {time_unit})

一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。

注意事项

无。

示例

  • 每10s发送一次watermark,事件最大允许延迟时间为20s。
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 BIGINT
    )
    WITH (
      type = "dis",
      region = "cn-north-1",
      channel = "csinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (RANGE interval 10 second, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;
  • 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。
    CREATE SOURCE STREAM student_scores (
      student_number STRING, /* 学号 */
      student_name STRING, /* 姓名 */
      subject STRING, /* 学科 */
      score INT, /* 成绩 */
      time2 BIGINT
    )
    WITH (
      type = "dis",
      region = "cn-north-1",
      channel = "csinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter=","
    )
    TIMESTAMP BY time2.rowtime
    SET WATERMARK (ROWS 10, interval 20 second);
    
    INSERT INTO score_greate_90
    SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) 
    FROM student_scores;
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区