Updated on 2024-08-10 GMT+08:00

Flink Savepoints CLI

Overview

Savepoints are externally stored checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's checkpoint mechanism to create a snapshot of the state of your streaming program and write the checkpoint meta data out to an external file system.

It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method. These IDs are used to scope the state of each operator.

DataStream<String> stream = env
//Statefulsource(e.g.Kafka)withID
.addSource(new StatefulSource())
.uid("source-id") //IDforthesourceoperator
.shuffle()
//StatefulmapperwithID
.map(new StateFulMapper())
.uid("mapper-id") //IDforthemapper
//Statelessprintingsink
.print(); //Auto-generatedID

Savepoint Recovery

If you do not specify the IDs manually, they will be generated automatically. You can automatically restore from the savepoint if these IDs do not change. The generated IDs depend on the structure of your program and are sensitive to program changes. Therefore, it is highly recommended to assign these IDs manually. When a savepoint is triggered, a single savepoint file will be created containing the checkpoint metadata. The actual checkpoint state will be kept around in the configured checkpoint directory, for example, with a FsStateBackend or RocksDBStateBackend:

  1. Trigger a savepoint.
    $ bin/flink savepoint jobId [targetDirectory]

    This command will trigger a savepoint for the job with ID:jobid. Furthermore, you can specify a target file system directory to store the savepoint. The directory must be accessible by JobManager. The targetDirectory is optional. If targetDirectory is not configured, the directory specified by state.savepoints.dir in the configuration file is used to store savepoint.

    You can configure a default savepoint target directory via the state.savepoints.dir key in the flink-conf.yaml file.

    # Default savepoint target directory

    You are advised to configure targetDirectory to an HDFS path.

    For example:

    bin/flink savepoint 405af8c02cf6dc069a0f9b7a1f7be088 hdfs://savepoint.
  2. Cancel a job with a savepoint.
    $ bin/flink cancel -s [targetDirectory] jobId

    This will atomically trigger a savepoint for the job with ID:jobid and cancel the job. Furthermore, you can specify a target file system directory to store the savepoint. The directory must be accessible by JobManager.

  3. Resume jobs.
    • Resume from a savepoint.
      $ bin/flink run -s savepointPath [runArgs]

      This command submits a job and specifies the savepoint path. The execution will resume from the respective savepoint state.

      runArgs is a user-defined parameter with parameter format and name varying depending on users.

    • Allow non-restored state.
      $ bin/flink run -s savepointPath -n [runArgs]

      By default the resume operation will try to map all state of the savepoint back to the program you are restoring with. If you dropped an operator, you can skip the state that cannot be mapped to the new program via -allowNonRestoredState (short: -n).

  4. Dispose savepoints.
    $ bin/flink savepoint -d savepointPath

    This command disposes the savepoint stored in: savepointPath.

Precautions

  • Chained operators are identified by the ID of the first task. It is not possible to manually assign an ID to an intermediate chained task, for example, in the chain [ a -> b -> c ] only a can have its ID assigned manually, but not b or c. To work around this, you can manually define the task chains. To manually define chains, use the disableChaining() interface. See the following example:
    env.addSource(new GetDataSource())
    .keyBy(0)
    .timeWindow(Time.seconds(2)).uid("window-id")
    .reduce(_+_).uid("reduce-id")
    .map(f=>(f,1)).disableChaining().uid("map-id")
    .print().disableChaining().uid("print-id")
  • During job upgrade, the data type of operators cannot be changed.