更新时间:2025-08-01 GMT+08:00
FlinkSQL中Changelog事件乱序问题
问题现象
给定以下SQL示例:
-- CDC source tables: s1 & s2 CREATE TEMPORARY TABLE s1 ( id BIGINT, level BIGINT, PRIMARY KEY(id) NOT ENFORCED )WITH (...); CREATE TEMPORARY TABLE s2 ( id BIGINT, attr VARCHAR, PRIMARY KEY(id) NOT ENFORCED )WITH (...); -- sink table: t1 CREATE TEMPORARY TABLE t1 ( id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id) NOT ENFORCED )WITH (...); -- join s1 and s2 and insert the result into t1 INSERT INTO t1 SELECT s1.*, s2.attr FROM s1 JOIN s2 ON s1.level = s2.id;
其中,源表s1的主键是id,但Join操作需要按level列进行shuffle,当Join算子并发度不为1时,那么对于s1的更新事件可能下发到Join的不同subtask中,如下图所示:
最终会导致Join输出的Changelog事件顺序不确定,从而可能导致不正确的结果输出。
问题原因
该问题的主要原因是Join操作后破坏了唯一键(本示例中(s1.id)、(s1.id,s1.level)和(s1.id,s2.id)这三组都是唯一键)的排序。
出现该问题的其他场景还包括:
- 结果表定义主键,而写入该结果表的数据丢失了唯一性。通常包括但不限于以下操作:
- 源表缺少主键,而结果表却设置了主键。
- 向结果表插入数据时,忽略了主键列的选择,或错误地使用了源表的非主键数据填充结果表的主键。
- 源表的主键数据在转换或经过分组聚合后出现精度损失。例如,将BIGINT类型降为INT类型。
- 对源表的主键列或经过分组聚合之后的唯一键进行了运算,如数据拼接或将多个主键合并为单一字段。
- 结果表的确立依赖于主键的设定,然而在数据输入过程中,其原有的顺序性却遭到破坏。
解决方法
在Sink前引入SinkUpsertMaterializer算子。其工作原理是在状态中维护了一个RowData列表,在处理输入行时,它根据推断的upsert键或整个行(如果upsert键为空)检查状态列表中是否存在相同的行。在ADD的情况下添加或更新状态中的行,在RETRACT的情况下从状态中删除行。最后,它根据结果表的主键生成Changelog事件。
父主题: Flink常见问题