Updated on 2022-06-01 GMT+08:00

Savepoints CLI

Savepoints are externally stored checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's checkpoint mechanism to create a snapshot of the state of your streaming program and write the checkpoint meta data out to an external file system.

It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method. These IDs are used to scope the state of each operator.

DataStream<String> stream = env
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
 // Stateful mapper with ID
.map(new StateFulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); //Auto-generated ID

Resuming from Savepoints

If you do not specify the IDs manually, the system will automatically assign one ID to each operator. You can resume from a savepoint as long as the ID of the operator is not changed. ID generation depends on the user's application code and is sensitive to the application code structure. Therefore, it is highly recommended to specify an ID for every operator manually. Data generated by savepoints will be saved in the configured file system, for example, FsStateBackend or RocksDBStateBackend.

  1. Trigger a savepoint.
    $ bin/flink savepoint <jobId> [targetDirectory]

    The command will trigger a savepoint for the job with ID: jobId. Furthermore, you can specify targetDirectory to store the savepoint. The directory must be accessible by JobManager. The targetDirectory parameter is optional. If you do not configure targetDirectory, the configured state.savepoints.dir in the configuration file is used to store the savepoint.

    You can set a default savepoint path using state.savepoints.dir in flink-conf.yaml.

    # Default savepoint target directory

    You are advised to set targetDirectory to an HDFS path. The following is an example.

    bin/flink savepoint  405af8c02cf6dc069a0f9b7a1f7be088 hdfs://savepoint 
  2. Cancel a job with a savepoint.
    $ bin/flink cancel -s [targetDirectory] jobId

    The command will atomically trigger a savepoint for the job with ID: jobid and cancel the job. Furthermore, you can specify targetDirectory to store the savepoint. The directory must be accessible by JobManager.

  3. Restore jobs using the following methods.
    • Restoring jobs from savepoints
      $ bin/flink run -s savepointPath [runArgs]

      The command submits a job and sets the initial state of the job to the state specified by savepointPath.

      runArgs is a user-defined parameter, whose format and name vary depending on users.

    • Allowing non-restored state
      $ bin/flink run -s savepointPath -n [runArgs]

      By default, the resume operation will try to map all state of the savepoint back to the program you are restoring with. If you dropped an operator, you are allowed to skip state that cannot be mapped to the new program via --allowNonRestoredState (short: -n) option.

  4. Dispose savepoints.
    $ bin/flink savepoint -d savepointPath

    The command disposes the savepoint stored in savepointPath.

Precautions

  • Chained operators are identified by the ID of the first task. It is not possible to manually assign an ID to an intermediate chained task, for example, in the chain [a->b->c] only a can have its ID assigned manually, but not b or c. To assign IDs to b and c, you need to manually define task chains using the disableChaining() API. The following provides an example:
    env.addSource(new GetDataSource())
    .keyBy(0)
    .timeWindow(Time.seconds(2)).uid("window-id")
    .reduce(_+_).uid("reduce-id")
    .map(f=>(f,1)).disableChaining().uid("map-id")
    .print().disableChaining().uid("print-id")
  • During job upgrade, the data type of operators cannot be changed.