更新时间:2024-10-21 GMT+08:00
分享

Flink任务开发规则

对有更新操作的数据流进行聚合计算时要注意数据准确性问题

在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。

例如:
Create table t1(
    id  int,
    partid int,
    value int
);
select
   partid,sum(value) 
 from t1 
 group by partid;
  • 第一批数据:[1,1,10],[2,1,11],[3,2,8]

    聚合结果:[1,21],[2,8]

  • 第二批数据:[2,1,12] //对ID=2的记录进行更新。

    错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。

    聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。

对于如何识别是更新数据有三种方式:

  • 通过状态后端解决

    通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。

    优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。

    缺点:大数据量情况下状态后端存储的数据比较多。

  • 通过CDC格式数据解决

    CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。

    优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。

    缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。

  • 通过changelog数据解决

    changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,再将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。

    优点:可以基于湖存储实现更新数据聚合一致性保证。

    缺点:

    • Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。
    • 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。
    • 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。

相关文档