更新时间:2025-08-01 GMT+08:00

FlinkSQL中Changelog事件乱序问题

问题现象

给定以下SQL示例:

-- CDC source tables: s1 & s2
CREATE TEMPORARY TABLE s1 (
   id BIGINT,
   level BIGINT,
   PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
CREATE TEMPORARY TABLE s2 (
   id BIGINT,
   attr VARCHAR,
   PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- sink table: t1
CREATE TEMPORARY TABLE t1 (
   id BIGINT,
   level BIGINT,
   attr VARCHAR,
   PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- join s1 and s2 and insert the result into t1
INSERT INTO t1
SELECT
   s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;

其中,源表s1的主键是id,但Join操作需要按level列进行shuffle,当Join算子并发度不为1时,那么对于s1的更新事件可能下发到Join的不同subtask中,如下图所示:

最终会导致Join输出的Changelog事件顺序不确定,从而可能导致不正确的结果输出。

问题原因

该问题的主要原因是Join操作后破坏了唯一键(本示例中(s1.id)、(s1.id,s1.level)和(s1.id,s2.id)这三组都是唯一键)的排序。

出现该问题的其他场景还包括:

  1. 结果表定义主键,而写入该结果表的数据丢失了唯一性。通常包括但不限于以下操作:
    • 源表缺少主键,而结果表却设置了主键。
    • 向结果表插入数据时,忽略了主键列的选择,或错误地使用了源表的非主键数据填充结果表的主键。
    • 源表的主键数据在转换或经过分组聚合后出现精度损失。例如,将BIGINT类型降为INT类型。
    • 对源表的主键列或经过分组聚合之后的唯一键进行了运算,如数据拼接或将多个主键合并为单一字段。
  2. 结果表的确立依赖于主键的设定,然而在数据输入过程中,其原有的顺序性却遭到破坏。

解决方法

在Sink前引入SinkUpsertMaterializer算子。其工作原理是在状态中维护了一个RowData列表,在处理输入行时,它根据推断的upsert键或整个行(如果upsert键为空)检查状态列表中是否存在相同的行。在ADD的情况下添加或更新状态中的行,在RETRACT的情况下从状态中删除行。最后,它根据结果表的主键生成Changelog事件。