文档首页/ MapReduce服务 MRS/ 开发指南(LTS版)/ Flink开发指南(普通模式)/ Flink应用开发常见问题/ 如何处理Checkpoint设置RocksDBStateBackend方式时Checkpoint慢
更新时间:2024-08-03 GMT+08:00

如何处理Checkpoint设置RocksDBStateBackend方式时Checkpoint慢

问题

如何处理checkpoint设置RocksDBStateBackend方式,且当数据量大时,执行checkpoint会很慢的问题?

原因分析

由于窗口使用自定义窗口,这时窗口的状态使用ListState,且同一个key值下,value的值非常多,每次新的value值到来都要使用RocksDB的merge()操作;触发计算时需要将该key值下所有的value值读出。

  • RocksDB的方式为merge()->merge()....->merge()->read(),该方式读取数据时非常耗时,如图1所示。
  • source算子在瞬间发送了大量数据,所有数据的key值均相等,导致window算子处理速度过慢,使barrier在缓存中积压,快照的制作时间过长,导致window算子在规定时间内没有向CheckpointCoordinator报告快照制作完成,CheckpointCoordinator认为快照制作失败,如图2所示。
图1 时间监控信息
图2 关系图

回答

Flink引入了第三方软件包RocksDB的缺陷问题导致该现象的发生。建议用户将checkpoint设置为FsStateBackend方式。

用户需要在应用代码中将checkpoint设置为FsStateBackend。例如:

 env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/"));