更新时间:2024-06-14 GMT+08:00
分享

Flink Savepoints CLI介绍

Savepoints在持久化存储中保存某个checkpoint,以便用户可以暂停自己的应用进行升级,并将状态设置为savepoint的状态,并继续运行。该机制利用了Flink的checkpoint机制创建流应用的快照,并将快照的元数据(meta-data)写入到一个额外的持久化文件系统中。

如果需要使用savepoints的功能,强烈推荐用户为每个算子通过uid(String)分配一个固定的ID,以便将来升级恢复使用,示例代码如下:

DataStream<String> stream = env
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
 // Stateful mapper with ID
.map(new StateFulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); //Auto-generated ID

savepoint恢复

如果用户不手动设置ID,系统将自动给每个算子分配一个ID。只要该算子的ID不改变,即可从savepoint恢复,ID的产生取决于用户的应用代码,并且对应用代码的结构十分敏感。因此,强烈推荐用户手动为每个算子设置ID。Savepoint产生的数据将被保存到配置的文件系统中,如FsStateBackend或者RocksDBStateBackend。

  1. 触发一个savepoint
    $ bin/flink savepoint <jobId> [targetDirectory]

    以上命令将触发ID为jobId的作业产生一个savepoint,另外,用户可以通过targetDirectory指定savepoint的存储路径,该路径必须是jobManager可以访问的,由于targetDirectory是可选的,如果用户没有配置targetDirectory,则是使用配置文件中“state.savepoints.dir”配置的目录来存放savepoint。

    用户可以在“flink-conf.yaml”中通过“state.savepoints.dir”选项设置默认的savepoint路径。

    # Default savepoint target directory

    建议用户将targetDirectory路径设置为HDFS路径 ,例如:

    bin/flink savepoint  405af8c02cf6dc069a0f9b7a1f7be088 hdfs://savepoint 
  2. 删除一个作业并进行savepoint
    $ bin/flink cancel -s [targetDirectory] jobId

    以上命令将删除一个作业,同时,在删除前将对该作业的状态进行保存。另外,用户可以通过targetDirectory指定savepoint的存储路径,该路径必须是jobManager可以访问的。

  3. 恢复作业方式
    • 从savepoint恢复作业。
      $ bin/flink run -s savepointPath [runArgs]

      以上命令将提交一个作业,并将该作业的初始状态置为savepointPath指定的状态。

      runArgs是指用户应用中自定义的参数,每个用户自定义的参数形式、名称都不一样。

    • 允许不恢复某个算子的状态
      $ bin/flink run -s savepointPath -n [runArgs]

      默认情况下,系统将尝试将savepoint的状态全部映射到用户的流应用中,如果用户升级的流应用删除了某个算子,可以通过--allowNonRestoredState(简写-n)恢复状态。

  4. 清除savepoints
    $ bin/flink savepoint -d savepointPath

    以上命令将删除保存在savepointPath的savepoint。

注意事项

  • 如果一个task中有算子链(Chained operators),将会将算子链上第一个算子的ID分配给该task。给算子链上的中间算子手动分配ID是不可能的。例如:在链(Chain)[a->b->c]中,只能给a手动分配ID,b和c不能分配。如果用户想给b和c分配ID,用户必须手动建链。手动建链时需要使用disableChaining()接口。举例如下:
    env.addSource(new GetDataSource())
    .keyBy(0)
    .timeWindow(Time.seconds(2)).uid("window-id")
    .reduce(_+_).uid("reduce-id")
    .map(f=>(f,1)).disableChaining().uid("map-id")
    .print().disableChaining().uid("print-id")
  • 用户升级job时不允许更改算子的数据类型。

相关文档