更新时间:2022-05-06 GMT+08:00
分享

Flink Jar作业配置checkpoint保存到OBS

Flink Jar作业配置checkpoint保存到OBS步骤如下:

  1. 在Flink Jar作业的Jar包代码中加入如下代码:
    //StreamExecutionEnvironment 依赖的pom文件配置参考后续说明
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointInterval(40000);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false);
            rocksDbBackend.setOptions(new OptionsFactory() {
                @Override
                public DBOptions createDBOptions(DBOptions currentOptions) {
                    return currentOptions
                            .setMaxLogFileSize(64 * 1024 * 1024)
                            .setKeepLogFileNum(3);
                }
    
                @Override
                public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
                    return currentOptions;
                }
            });
            env.setStateBackend(rocksDbBackend);

    上述代码含义是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路径。

    其中,最重要的是保存checkpoint路径。一般是将checkpoint存入OBS桶中,路径格式如下:

    • 路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx
    • 示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
    • StreamExecutionEnvironment依赖的包需要在pom文件中添加如下配置。完整pom文件可以参考使用Flink Jar写入数据到OBS
      <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
      </dependency>
  2. 在DLI Flink Jar作业中配置“从checkpoint恢复”功能。
    图1 配置“从checkpoint恢复”
    1. 在DLI控制台,选择“作业管理”>“Flink作业”。
    2. 在对应Flink Jar作业“操作”列中,单击“编辑”,进入Flink Jar作业编辑页面。
    3. 勾选“异常自动重启”。
    4. 勾选“从checkpoint恢复”,填写“Checkpoint路径”。

      Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:

      • “Checkpoint路径”格式为:${bucket}/xxx/xxx/xxx
      • 示例:

        如果Jar包中代码配置为:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3

        那么“Checkpoint路径”填写为: mybucket/jobs/checkpoint/jar-3

    • 每个Flink Jar作业配置的Checkpoint路径要保持不同,否则无法从准确的checkpoint路径恢复。
    • checkpoint路径中的OBS桶需要给DLI授权,DLI服务才能访问此桶下的文件。
  3. 查看作业是否从checkpoint恢复。
    图2 查看是否恢复
分享:

Flink Jar作业相关问题所有常见问题

more

close