How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
The procedure is as follows:
- Add the following code to the JAR file code of the Flink Jar job:
// Configure the pom file on which the StreamExecutionEnvironment depends. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(40000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false); rocksDbBackend.setOptions(new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions .setMaxLogFileSize(64 * 1024 * 1024) .setKeepLogFileNum(3); } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions; } }); env.setStateBackend(rocksDbBackend);
The preceding code saves the checkpoint to the ${bucket} bucket in jobs/checkpoint/my_jar path every 40 seconds in EXACTLY_ONCE mode.
Pay attention to the checkpoint storage path. Generally, the checkpoint is stored in the OBS bucket. The path format is as follows:
- Path format: obs://${bucket}/xxx/xxx/xxx
- Example
- Add the following configuration to the POM file for the packages on which the StreamExecutionEnvironment depends:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
- Configure Runtime Configuration and Restore Job from Checkpoint for a DLI Flink Jar job.
Figure 1 Configuring Runtime Configuration and Restore Job from Checkpoint
- Constraints on parameter optimization
- In the left navigation pane of the DLI console, choose Global Configuration > Service Authorization. On the page displayed, select Tenant Administrator(Global service) and click Update.
Figure 2 Selecting Tenant Administrator(Global service)
- The bucket to which data is written must be an OBS bucket created by a main account.
- In the left navigation pane of the DLI console, choose Global Configuration > Service Authorization. On the page displayed, select Tenant Administrator(Global service) and click Update.
- Configuring Restore Job from Checkpoint
- Select Auto Restart upon Exception.
- Select Restore Job from Checkpoint and set the Checkpoint Path.
The checkpoint path is the same as that you set in JAR file code. The format is as follows:
- The checkpoint path for each Flink Jar job must be unique. Otherwise, data cannot be restored.
- DLI can access files in the checkpoint path only after DLI is authorized to access the OBS bucket.
- Constraints on parameter optimization
- Check whether the job is restored from the checkpoint.
Figure 3 Checking the restoration
Flink Jar Jobs FAQs
- How Do I Configure Checkpoints for Flink Jar Jobs and Save the Checkpoints to OBS?
- Does a Flink JAR Job Support Configuration File Upload? How Do I Upload a Configuration File?
- Why Does the Submission Fail Due to Flink JAR File Conflict?
- Why Does a Flink Jar Job Fail to Access GaussDB(DWS) and a Message Is Displayed Indicating Too Many Client Connections?
- Why Is Error Message "Authentication failed" Displayed During Flink Jar Job Running?
- Why Is Error Invalid OBS Bucket Name Reported After a Flink Job Submission Failed?
- Why Does the Flink Submission Fail Due to Hadoop JAR File Conflict?
- How Do I Connect a Flink jar Job to SASL_SSL?
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbotmore