Flink Savepoints CLI介绍
概述
Savepoints在持久化存储中保存某个checkpoint,以便用户可以暂停自己的应用进行升级,并将状态设置为savepoint的状态,并继续运行。该机制利用了Flink的checkpoint机制创建流应用的快照,并将快照的元数据(meta-data)写入到一个额外的持久化文件系统中。
如果需要使用savepoints的功能,强烈推荐用户为每个算子通过uid(String)分配一个固定的ID,以便将来升级恢复使用,示例代码如下:
DataStream<String> stream = env // Stateful source (e.g. Kafka) with ID .addSource(new StatefulSource()) .uid("source-id") // ID for the source operator .shuffle() // Stateful mapper with ID .map(new StateFulMapper()) .uid("mapper-id") // ID for the mapper // Stateless printing sink .print(); //Auto-generated ID
savepoint恢复
如果用户不手动设置ID,系统将自动给每个算子分配一个ID。只要该算子的ID不改变,即可从savepoint恢复,ID的产生取决于用户的应用代码,并且对应用代码的结构十分敏感。因此,强烈推荐用户手动为每个算子设置ID。Savepoint产生的数据将被保存到配置的文件系统中,如FsStateBackend或者RocksDBStateBackend。
- 触发一个savepoint
$ bin/flink savepoint <jobId> [targetDirectory]
以上命令将触发ID为jobId的作业产生一个savepoint,另外,用户可以通过targetDirectory指定savepoint的存储路径,该路径必须是jobManager可以访问的,由于targetDirectory是可选的,如果用户没有配置targetDirectory,则是使用配置文件中“state.savepoints.dir”配置的目录来存放savepoint。
用户可以在“flink-conf.yaml”中通过“state.savepoints.dir”选项设置默认的savepoint路径。
# Default savepoint target directory
建议用户将targetDirectory路径设置为HDFS路径 ,例如:
bin/flink savepoint 405af8c02cf6dc069a0f9b7a1f7be088 hdfs://savepoint
- 删除一个作业并进行savepoint
$ bin/flink cancel -s [targetDirectory] jobId
以上命令将删除一个作业,同时,在删除前将对该作业的状态进行保存。另外,用户可以通过targetDirectory指定savepoint的存储路径,该路径必须是jobManager可以访问的。
- 恢复作业方式
- 从savepoint恢复作业。
$ bin/flink run -s savepointPath [runArgs]
以上命令将提交一个作业,并将该作业的初始状态置为savepointPath指定的状态。
runArgs是指用户应用中自定义的参数,每个用户自定义的参数形式、名称都不一样。
- 允许不恢复某个算子的状态
$ bin/flink run -s savepointPath -n [runArgs]
默认情况下,系统将尝试将savepoint的状态全部映射到用户的流应用中,如果用户升级的流应用删除了某个算子,可以通过--allowNonRestoredState(简写-n)恢复状态。
- 从savepoint恢复作业。
- 清除savepoints
$ bin/flink savepoint -d savepointPath
以上命令将删除保存在savepointPath的savepoint。
注意事项
- 如果一个task中有算子链(Chained operators),将会将算子链上第一个算子的ID分配给该task。给算子链上的中间算子手动分配ID是不可能的。例如:在链(Chain)[a->b->c]中,只能给a手动分配ID,b和c不能分配。如果用户想给b和c分配ID,用户必须手动建链。手动建链时需要使用disableChaining()接口。举例如下:
env.addSource(new GetDataSource()) .keyBy(0) .timeWindow(Time.seconds(2)).uid("window-id") .reduce(_+_).uid("reduce-id") .map(f=>(f,1)).disableChaining().uid("map-id") .print().disableChaining().uid("print-id")
- 用户升级job时不允许更改算子的数据类型。