如何处理Checkpoint设置RocksDBStateBackend方式时Checkpoint慢
问题
如何处理checkpoint设置RocksDBStateBackend方式,且当数据量大时,执行checkpoint会很慢的问题?
原因分析
由于窗口使用自定义窗口,这时窗口的状态使用ListState,且同一个key值下,value的值非常多,每次新的value值到来都要使用RocksDB的merge()操作;触发计算时需要将该key值下所有的value值读出。
- RocksDB的方式为merge()->merge()....->merge()->read(),该方式读取数据时非常耗时,如图1所示。
- source算子在瞬间发送了大量数据,所有数据的key值均相等,导致window算子处理速度过慢,使barrier在缓存中积压,快照的制作时间过长,导致window算子在规定时间内没有向CheckpointCoordinator报告快照制作完成,CheckpointCoordinator认为快照制作失败,如图2所示。
回答
Flink引入了第三方软件包RocksDB的缺陷问题导致该现象的发生。建议用户将checkpoint设置为FsStateBackend方式。
用户需要在应用代码中将checkpoint设置为FsStateBackend。例如:
env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/"));