窗口去重
功能描述
窗口去重是一种特殊的去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。
对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好。通常,窗口去重直接用于窗口表值函数上。另外,它可以用于基于窗口表值函数的操作。比如窗口聚合,窗口TopN和窗口关联。
窗口Top-N的语法和普通的Top-N相同。 除此之外,窗口去重需要 PARTITION BY 子句包含表的 window_start 和 window_end 列。 否则优化器无法翻译。
Flink 使用 ROW_NUMBER() 移除重复数据,就像窗口TopN一样。理论上,窗口是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的。
更多介绍和使用请参考开源社区文档:窗口去重。
语法格式
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) -- relation applied windowing TVF WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]
参数说明:
- ROW_NUMBER():为每一行分配一个唯一且连续的序号,从1开始。
- PARTITION BY window_start, window_end [, col_key1...]: 指定分区字段,需要包含window_start, window_end以及其他分区键。
- ORDER BY time_attr [asc|desc]: 指定排序列,必须是时间属性。目前 Flink 支持处理时间属性和事件时间属性。 Order by ASC 表示保留第一行,Order by DESC 表示保留最后一行。
- WHERE (rownum = 1 | rownum <=1 | rownum < 2): 优化器通过 rownum = 1 | rownum <=1 | rownum < 2 来识别查询能否被翻译成窗口去重。
注意事项
- Flink 只支持在滚动窗口、滑动窗口和累积窗口的窗口表值函数后进行窗口去重
- 窗口去重只支持根据事件时间属性进行排序
示例
本示例展示了在10分钟的滚动窗口上保持最后一条记录。
-- tables must have time attribute, e.g. `bidtime` in this table Flink SQL> DESC Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ Flink SQL> SELECT * FROM Bid; +------------------+-------+------+ | bidtime | price | item | +------------------+-------+------+ | 2020-04-15 08:05 | 4.00 | C | | 2020-04-15 08:07 | 2.00 | A | | 2020-04-15 08:09 | 5.00 | D | | 2020-04-15 08:11 | 3.00 | B | | 2020-04-15 08:13 | 1.00 | E | | 2020-04-15 08:17 | 6.00 | F | +------------------+-------+------+ Flink SQL> SELECT * FROM ( SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownum FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) ) WHERE rownum <= 1; +------------------+-------+------+-------------+------------------+------------------+--------+ | bidtime | price | item | supplier_id | window_start | window_end | rownum | +------------------+-------+------+-------------+------------------+------------------+--------+ | 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:00 | 2020-04-15 08:10 | 1 | | 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:10 | 2020-04-15 08:20 | 1 | +------------------+-------+------+-------------+------------------+------------------+--------+