窗口聚合
窗口表值函数(TVF)聚合
窗口聚合是通过GROUP BY子句定义的,其特征是包含窗口表值函数产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态。
更多介绍和使用请参考开源社区文档:窗口聚合。
分组窗口的开始和结束时间戳可以通过 window_start 和 window_end 来选定。
- 窗口表值函数
Flink 支持在 TUMBLE, HOP 和 CUMULATE 上进行窗口聚合。
- 在流模式下,窗口表值函数的时间属性字段必须是事件时间或处理时间。关于窗口函数更多信息,参见 窗口表值函数(Windowing TVFs)。
- 在批模式下,窗口表值函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的。
-- tables must have time attribute, e.g. `bidtime` in this table Flink SQL> desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | | supplier_id | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ Flink SQL> SELECT * FROM Bid; +------------------+-------+------+-------------+ | bidtime | price | item | supplier_id | +------------------+-------+------+-------------+ | 2020-04-15 08:05 | 4.00 | C | supplier1 | | 2020-04-15 08:07 | 2.00 | A | supplier1 | | 2020-04-15 08:09 | 5.00 | D | supplier2 | | 2020-04-15 08:11 | 3.00 | B | supplier2 | | 2020-04-15 08:13 | 1.00 | E | supplier1 | | 2020-04-15 08:17 | 6.00 | F | supplier2 | +------------------+-------+------+-------------+ -- tumbling window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +------------------+------------------+-------+ -- hopping window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | +------------------+------------------+-------+ -- cumulative window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
- GROUPING SETS
窗口聚合也支持 GROUPING SETS 语法。Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 GROUP BY 子句一样为每个组进行聚合。
GROUPING SETS 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 GROUPING SETS 子句中不能包含这两个字段。
Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ()); +------------------+------------------+-------------+-------+ | window_start | window_end | supplier_id | price | +------------------+------------------+-------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 | +------------------+------------------+-------------+-------+
GROUPING SETS 的每个子列表可以是空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。
对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。
- ROLLUP
ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。
例如:ROLLUP (one,two) 等效于 GROUPING SET((one,two),(one),()).
ROLLUP 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 ROLLUP 子句中不能包含这两个字段。
例如:下面这个查询和上个例子中的效果是一样的。
SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, ROLLUP (supplier_id);
- CUBE
CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。
CUBE 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 CUBE 子句中不能包含这两个字段。
例如:下面两个查询是等效的。
SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, CUBE (supplier_id, item); SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ( (supplier_id, item), (supplier_id ), ( item), ( ) )
- 多级窗口聚合
window_start 和 window_end 列是普通的时间戳字段,并不是时间属性。因此它们不能在后续的操作中当做时间属性进行基于时间的操作。
为了传递时间属性,需要在 GROUP BY 子句中添加 window_time 列。window_time 是窗口表值函数(Windowing TVFs)产生的三列之一,它是窗口的时间属性。 window_time 添加到 GROUP BY 子句后就能被选定了。下面的查询可以把它用于后续基于时间的操作,比如:多级窗口聚合和Window TopN。
下面展示了一个多级窗口聚合:第一个窗口聚合后把时间属性传递给第二个窗口聚合。
-- tumbling 5 minutes for each supplier_id CREATE VIEW window1 AS -- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF. SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY supplier_id, window_start, window_end, window_time; -- tumbling 10 minutes on the first window SELECT window_start, window_end, SUM(partial_price) as total_price FROM TABLE( TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;