更新时间:2022-05-06 GMT+08:00
Flink Jar作业配置checkpoint保存到OBS
Flink Jar作业配置checkpoint保存到OBS步骤如下:
- 在Flink Jar作业的Jar包代码中加入如下代码:
//StreamExecutionEnvironment 依赖的pom文件配置参考后续说明 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(40000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false); rocksDbBackend.setOptions(new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions .setMaxLogFileSize(64 * 1024 * 1024) .setKeepLogFileNum(3); } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions; } }); env.setStateBackend(rocksDbBackend);
上述代码含义是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路径。
其中,最重要的是保存checkpoint路径。一般是将checkpoint存入OBS桶中,路径格式如下:
- 路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx
- 示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
- StreamExecutionEnvironment依赖的包需要在pom文件中添加如下配置。完整pom文件可以参考使用Flink Jar写入数据到OBS。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
- 在DLI Flink Jar作业中配置“从checkpoint恢复”功能。图1 配置“从checkpoint恢复”
- 在DLI控制台,选择“作业管理”>“Flink作业”。
- 在对应Flink Jar作业“操作”列中,单击“编辑”,进入Flink Jar作业编辑页面。
- 勾选“异常自动重启”。
- 勾选“从checkpoint恢复”,填写“Checkpoint路径”。
- 每个Flink Jar作业配置的Checkpoint路径要保持不同,否则无法从准确的checkpoint路径恢复。
- checkpoint路径中的OBS桶需要给DLI授权,DLI服务才能访问此桶下的文件。
- 查看作业是否从checkpoint恢复。图2 查看是否恢复
父主题: Flink Jar作业相关问题
Flink Jar作业相关问题所有常见问题
more
