更新时间:2024-01-23 GMT+08:00

Flink Jar作业配置checkpoint保存到OBS

Flink Jar作业配置checkpoint保存到OBS步骤如下:

  1. 在Flink Jar作业的Jar包代码中加入如下代码:
    //StreamExecutionEnvironment 依赖的pom文件配置参考后续说明
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointInterval(40000);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false);
            rocksDbBackend.setOptions(new OptionsFactory() {
                @Override
                public DBOptions createDBOptions(DBOptions currentOptions) {
                    return currentOptions
                            .setMaxLogFileSize(64 * 1024 * 1024)
                            .setKeepLogFileNum(3);
                }
    
                @Override
                public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
                    return currentOptions;
                }
            });
            env.setStateBackend(rocksDbBackend);

    上述代码含义是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路径。

    其中,最重要的是保存checkpoint路径。一般是将checkpoint存入OBS桶中,路径格式如下:

    • 路径格式:obs://${bucket}/xxx/xxx/xxx
    • 示例:

      obs://mybucket/jobs/checkpoint/jar-3

    • StreamExecutionEnvironment依赖的包需要在pom文件中添加如下配置。
      <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
      </dependency>
  2. 在DLI Flink Jar作业中配置“ 优化参数 ”和“从checkpoint恢复”功能。
    图1 配置“优化参数”和“从checkpoint恢复”
    • 参数优化的约束与限制
      1. 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
        图2 开启开启Tenant Adminstrator(全局服务)
      2. 写入数据到OBS的桶必须为主帐号下所创建的OBS桶。
    • 配置从checkpoint恢复
      1. 勾选“异常自动重启”。
      2. 勾选“从checkpoint恢复”,填写“Checkpoint路径”。

        Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:

        • “Checkpoint路径”格式为:${bucket}/xxx/xxx/xxx
        • 示例:

          如果Jar包中代码配置为:obs://mybucket/jobs/checkpoint/jar-3

          那么“Checkpoint路径”填写为: mybucket/jobs/checkpoint/jar-3

      • 每个Flink Jar作业配置的Checkpoint路径要保持不同,否则无法从准确的checkpoint路径恢复。
      • checkpoint路径中的OBS桶需要给DLI授权,DLI服务才能访问此桶下的文件。
  3. 查看作业是否从checkpoint恢复。
    图3 查看是否恢复

Flink Jar作业相关问题 所有常见问题

more