Flink任务开发规则
对有更新操作的数据流进行聚合计算时要注意数据准确性问题
在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。
Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid;
- 第一批数据:[1,1,10],[2,1,11],[3,2,8]
- 第二批数据:[2,1,12] //对ID=2的记录进行更新。
错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。
聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。
对于如何识别是更新数据有三种方式:
- 通过状态后端解决
通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。
优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。
缺点:大数据量情况下状态后端存储的数据比较多。
- 通过CDC格式数据解决
CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。
优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。
缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。
- 通过changelog数据解决
changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,再将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。
优点:可以基于湖存储实现更新数据聚合一致性保证。
缺点:
- Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。
- 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。
- 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。