更新时间:2024-01-23 GMT+08:00
Flink Jar作业配置checkpoint保存到OBS
Flink Jar作业配置checkpoint保存到OBS步骤如下:
- 在Flink Jar作业的Jar包代码中加入如下代码:
//StreamExecutionEnvironment 依赖的pom文件配置参考后续说明 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(40000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false); rocksDbBackend.setOptions(new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions .setMaxLogFileSize(64 * 1024 * 1024) .setKeepLogFileNum(3); } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions; } }); env.setStateBackend(rocksDbBackend);
上述代码含义是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路径。
其中,最重要的是保存checkpoint路径。一般是将checkpoint存入OBS桶中,路径格式如下:
- 路径格式:obs://${bucket}/xxx/xxx/xxx
- 示例:
- StreamExecutionEnvironment依赖的包需要在pom文件中添加如下配置。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
- 在DLI Flink Jar作业中配置“ 优化参数 ”和“从checkpoint恢复”功能。
图1 配置“优化参数”和“从checkpoint恢复”
- 参数优化的约束与限制
- 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
图2 开启开启Tenant Adminstrator(全局服务)
- 写入数据到OBS的桶必须为主帐号下所创建的OBS桶。
- 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
- 配置从checkpoint恢复
- 勾选“异常自动重启”。
- 勾选“从checkpoint恢复”,填写“Checkpoint路径”。
- 每个Flink Jar作业配置的Checkpoint路径要保持不同,否则无法从准确的checkpoint路径恢复。
- checkpoint路径中的OBS桶需要给DLI授权,DLI服务才能访问此桶下的文件。
- 参数优化的约束与限制
- 查看作业是否从checkpoint恢复。
图3 查看是否恢复
父主题: Flink Jar作业相关问题
Flink Jar作业相关问题 所有常见问题
more