TUMBLE WINDOW扩展
功能概述
注意事项
- 如果使用insert语句将结果写入sink中,则sink需要支持upsert模式,所以结果表需要支持upsert操作,且定义主键。
- 延迟时间设置仅用于事件时间,在处理时间中不生效。
- 辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用。
- 如果使用事件时间,则需要使用watermark标识,代码如下(其中order_time被标识为事件时间列,watermark时间设置为3秒):
CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, watermark for order_time as order_time - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
- 如果使用处理时间,则需要使用计算列设置,其代码如下(其中proc即为处理时间列):
CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proc as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
语法格式
TUMBLE(time_attr, window_interval, period_interval, lateness_interval)
语法示例
TUMBLE(testtime, INTERVAL '10' SECOND, INTERVAL '10' SECOND, INTERVAL '10' SECOND)
参数说明
|
参数 |
说明 |
参数格式 |
|---|---|---|
|
time_attr |
表示相应的事件时间或者处理时间属性列。
|
- |
|
window_interval |
表示窗口的持续时长。 |
|
|
period_interval |
表示在窗口范围内周期性触发的频率,即在窗口结束前,从窗口开启开始,每隔period_interval时长更新一次输出结果。如果没有设置,则默认没有使用周期触发策略。 |
|
|
lateness_interval |
表示窗口结束后延迟lateness_interval时长,继续统计在窗口结束后延迟时间内到达的属于该窗口的数据,而且在延迟时间内到达的每个数据都会更新输出结果。
说明:
当时间窗口为处理时间时,无论lateness_interval为何值,都不会有效果。 |
- 当period_interval为0时,表示没有使用窗口的周期触发策略;
- 当lateness_interval为0时,表示没有使用窗口结束后的延迟策略;
- 当二者都没有填写时,默认两种策略都没有配置,仅使用普通的TUMBLE窗口。
- 如果仅需使用延迟时间策略,则需要将上述period_interval格式中的'10'设置为 '0'。
辅助函数
|
辅助函数 |
说明 |
|---|---|
|
TUMBLE_START(time_attr, window_interval, period_interval, lateness_interval) |
返回相对应的滚动窗口范围内的下界时间戳。 |
|
TUMBLE_END(time_attr, window_interval, period_interval, lateness_interval) |
返回相对应的滚动窗口范围以外的上界时间戳。 |
示例
1. 根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在30秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:
- 根据MySQL和kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组,入向规则,使其对当前将要使用的队列放开,并根据MySQL和kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功;否则表示未成功。
- 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` ( `user_id` VARCHAR(32) NOT NULL, `window_start` TIMESTAMP NOT NULL, `window_end` TIMESTAMP NULL, `total_num` BIGINT UNSIGNED NULL, PRIMARY KEY (`user_id`, `window_start`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
- 创建flink opensource sql作业,并提交运行作业(这里设置窗口的大小为30秒,触发周期为10秒,延迟时间设置为5秒,即窗口结束前如果结果有更新,则每隔十秒输出一次中间结果。在watermark到达使得窗口结束后,事件时间在watermark5秒内的数据仍然会被处理,并统计到当前所属窗口;如果在5秒以外,则该数据会被丢弃):
CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, watermark for order_time as order_time - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE jdbcSink ( user_id string, window_start timestamp(3), window_end timestamp(3), total_num BIGINT, primary key (user_id, window_start) not enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<yourMySQL>:3306/flink', 'table-name' = 'order_count', 'username' = '<yourUserName>', 'password' = '<yourPassword>', 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select order_id, TUMBLE_START(order_time, INTERVAL '30' SECOND, INTERVAL '10' SECOND, INTERVAL '5' SECOND), TUMBLE_END(order_time, INTERVAL '30' SECOND, INTERVAL '10' SECOND, INTERVAL '5' SECOND), COUNT(*) from orders GROUP BY user_id, TUMBLE(order_time, INTERVAL '30' SECOND, INTERVAL '10' SECOND, INTERVAL '5' SECOND); - 向kafka中插入数据(这里假设同一个用户在不同时间下的订单,且因为某种原因导致10:00:13的订单数据较晚到达):
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241000000002", "order_channel":"webShop", "order_time":"2021-03-24 10:00:20", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241000000003", "order_channel":"webShop", "order_time":"2021-03-24 10:00:33", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241000000004", "order_channel":"webShop", "order_time":"2021-03-24 10:00:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} - 在MySQL中使用下述语句查看输出结果,,输出结果如下(因无法展示周期性输出结果,所以这里展示的是最终结果):
select * from order_count
user_id window_start window_end total_num 0001 2021-03-24 10:00:00 2021-03-24 10:00:30 3 0001 2021-03-24 10:00:30 2021-03-24 10:01:00 1