Help Center/ Data Lake Insight/ FAQs/ Flink Jobs/ Flink Jar Jobs/ How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
Updated on 2023-06-16 GMT+08:00

How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?

The procedure is as follows:

  1. Add the following code to the JAR file code of the Flink Jar job:
    // Configure the pom file on which the StreamExecutionEnvironment depends.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointInterval(40000);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false);
            rocksDbBackend.setOptions(new OptionsFactory() {
                @Override
                public DBOptions createDBOptions(DBOptions currentOptions) {
                    return currentOptions
                            .setMaxLogFileSize(64 * 1024 * 1024)
                            .setKeepLogFileNum(3);
                }
    
                @Override
                public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
                    return currentOptions;
                }
            });
            env.setStateBackend(rocksDbBackend);

    The preceding code saves the checkpoint to the ${bucket} bucket in jobs/checkpoint/my_jar path every 40 seconds in EXACTLY_ONCE mode.

    Pay attention to the checkpoint storage path. Generally, the checkpoint is stored in the OBS bucket. The path format is as follows:

    • Path format: obs://${bucket}/xxx/xxx/xxx
    • Example

      obs://mybucket/jobs/checkpoint/jar-3

    • Add the following configuration to the POM file for the packages on which the StreamExecutionEnvironment depends:
      <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
      </dependency>
  2. Configure Runtime Configuration and Restore Job from Checkpoint for a DLI Flink Jar job.
    Figure 1 Configuring Runtime Configuration and Restore Job from Checkpoint
    • Constraints on parameter optimization
      1. In the left navigation pane of the DLI console, choose Global Configuration > Service Authorization. On the page displayed, select Tenant Administrator(Global service) and click Update.
        Figure 2 Selecting Tenant Administrator(Global service)
      2. The bucket to which data is written must be an OBS bucket created by a main account.
    • Configuring Restore Job from Checkpoint
      1. Select Auto Restart upon Exception.
      2. Select Restore Job from Checkpoint and set the Checkpoint Path.

        The checkpoint path is the same as that you set in JAR file code. The format is as follows:

        • ${bucket}/xxx/xxx/xxx
        • Example

          If the path in the JAR file is obs://mybucket/jobs/checkpoint/jar-3,

          Set Checkpoint Path to mybucket/jobs/checkpoint/jar-3.

      • The checkpoint path for each Flink Jar job must be unique. Otherwise, data cannot be restored.
      • DLI can access files in the checkpoint path only after DLI is authorized to access the OBS bucket.
  3. Check whether the job is restored from the checkpoint.
    Figure 3 Checking the restoration