更新时间:2024-11-26 GMT+08:00
分享

配置FlinkServer作业重启策略

FlinkServer作业重启策略介绍

Flink支持不同的重启策略,以在发生故障时控制作业是否重启以及如何重启。如果不指定重启策略,集群会使用默认的重启策略。用户也可以在提交作业时指定一个重启策略,可参考如何创建FlinkServer作业在作业开发界面配置(MRS 3.1.0及以后版本)。

重启策略也可以通过Flink的配置文件“客户端安装目录/Flink/flink/conf/flink-conf.yaml”中的参数“restart-strategy”指定,为全局配置,还可以在应用代码中动态指定,会覆盖全局配置,重启策略包括失败率(failure-rate)和两种默认策略,默认策略为如下:

  • 无重启(No restart):如果没有启用CheckPoint,默认使用该策略。
  • 固定间隔(fixed-delay):如果启用了CheckPoint,但没有配置重启策略,默认使用该策略。

No restart策略

发生故障时作业会直接失败,不会尝试重启。

参数配置为:

restart-strategy: none

fixed-delay策略

发生故障时会尝试重启作业固定次数,如果超过了最大的尝试次数,作业最终会失败。并且在两次连续重启尝试之间,重启策略会等待固定的时间。

以配置如果重启失败了3次则认为该Job失败,重试时间间隔为10s为例,参数配置为:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

failure-rate策略

在作业失败后会直接重启,但超过设置的失败率后,作业会被认定为失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

以配置10分钟内如果重启失败了3次则认为该作业失败,重试时间间隔为10s为例,参数配置为:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 10 min
restart-strategy.failure-rate.delay: 10 s

如何选择重启策略

  • 如果用户在作业失败后,不希望重试,则推荐使用No restart策略。
  • 如果用户在作业失败后,希望对作业进行重试,推荐使用failure-rate策略。因为fixed-delay策略可能会因为网络、内存等硬件故障导致用户作业失败次数达到最大重试次数,从而导致作业失败。

    为了防止在failure-rate策略下的无限重启,推荐如下参数配置:

    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: 3
    restart-strategy.failure-rate.failure-rate-interval: 10 min
    restart-strategy.failure-rate.delay: 10 s

如何创建FlinkServer作业

在FlinkServer提交的SQL作业,其SQL文本会保存到DBServer中,在MRS 3.5.0及以后版本,FlinkServer默认提供了SQL加密存储的能力,保护用户的SQL信息安全。FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。如果是自定义connector,密码字段名要用“password”关键字,否则页面可能显示敏感信息。

关闭SQL加密存储可能会存在密码泄漏风险,建议保持默认开启SQL加密存储。如果要关闭,可参考如下操作:

  1. (可选)备份已存在的作业,然后删除所有作业。作业备份和导入可参考导入导出FlinkServer作业信息
  2. 修改“ENABLE_DB_ENCRYPT”的值为“false”。

    登录FlinkServer主备节点,将“$BIGDATA_HOME//FusionInsight_Flink_x.x.x/x_x_FlinkServer/etc/flinkserver_service.properties”文件中的“ENABLE_DB_ENCRYPT”参数值设置为“false”,保存并退出。

  3. 重启受影响的FlinkServer实例。

    登录FusionInsight Manager,选择“集群 > 服务 > Flink > 实例”,勾选所有FlinkServer实例,选择“更多 > 重启实例”,根据界面提示重启FlinkServer实例。

  1. 访问Flink WebUI,请参考访问FlinkServer WebUI界面
  2. 单击“作业管理”进入作业管理页面。
  3. 单击“新建作业”,在新建作业页面可选择新建Flink SQL作业或Flink Jar作业,然后填写作业信息,单击“确定”,创建作业成功并进入作业开发界面。
  4. (可选)如果需要立即进行作业开发,可以在作业开发界面进行作业配置。

    进行作业开发时,系统支持对作业添加锁的功能,锁定作业的用户具备该作业的所有权限,其他用户不具备被锁定的作业的开发、启动和删除等权限,但可通过强制获取锁来具备作业的所有权限。开启该功能后,可直接通过单击“锁定作业”、“解锁作业”、“强制获取锁”来获取相应的权限。

    系统默认开启作业锁功能,可在Manager查看该功能启用状态。适用于MRS 3.3.0及以后版本。

    登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索参数“job.edit.lock.enable”,参数值为“true”表示开启,值为“false”表示关闭。

    • 新建Flink SQL作业
      1. 在作业开发界面进行作业开发。
        图1 FlinkServer作业开发界面

      2. 可以单击上方“语义校验”对输入内容校验,单击“SQL格式化”对SQL语句进行格式化。
      3. 作业SQL开发完成后,请参考表1设置基础参数,还可根据需要设置自定义参数,然后单击“保存”。
        表1 基础参数

        参数名称

        参数描述

        并行度

        并行数量。

        算子最大并行度

        算子最大的并行度。

        JobManager内存(MB)

        JobManager的内存。输入值最小为4096

        提交队列

        作业提交队列。不填默认提交到default。

        taskManager

        taskManager运行参数。该参数需配置以下内容:

        • slot数量:不填默认是1,建议填CPU核数
        • 内存(MB):输入值最小为4096

        开启CheckPoint

        是否开启CheckPoint。开启后,需配置以下内容:

        • 时间间隔(ms):必填;
        • 模式:必填;

          可选项为:EXACTLY_ONCE、AT_LEAST_ONCE;

        • 最小间隔(ms):输入值最小为10;
        • 超时时间:输入值最小为10;
        • 最大并发量:正整数,且不能超过64个字符;
        • 是否清理:是/否;
        • 是否开启增量Checkpoint:是/否。

        故障恢复策略

        作业的故障恢复策略,包含以下三种,详情请参考配置FlinkServer作业重启策略

        • fixed-delay:需配置“重试次数”和“失败重试间隔(s)”;
        • failure-rate:需配置“最大重试次数”、“时间间隔(min)”和“失败重试间隔(s)”;
        • none:无。
      4. 单击左上角“提交”提交作业。
    • 新建Flink Jar作业
      1. 单击“选择”,上传本地Jar文件,并参考表2配置参数或添加自定义参数。
        表2 参数配置

        参数名称

        参数描述

        本地jar文件

        上传jar文件。直接上传本地文件,大小不能超过“flinkserver.upload.jar.max.size”设置的阈值,默认500MB

        登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索参数“flinkserver.upload.jar.max.size”即可设置jar文件阈值,取值范围为100-5120,单位MB。

        Main Class

        Main-Class类型。

        • 默认:默认根据Jar包文件的Mainfest文件指定类名。
        • 指定:手动指定类名

        类名

        类名。

        “Main Class”选择“指定”时存在该参数。

        类参数

        类参数,为Main-Class的参数(参数间用空格分隔)

        并行度

        并行数量。

        并行数为作业每个算子的并行数,适度增加并行数会提高作业整体算力,但也须考虑线程增多带来的切换开销,其上限是计算单元SPU数的四倍,最佳实践为计算单元SPU数的1-2倍。

        JobManager内存(MB)

        JobManager的内存。输入值最小为4096

        提交队列

        作业提交队列。不填默认提交到default。

        taskManager

        taskManager运行参数。该参数需配置以下内容:

        • slot数量:不填默认是1,建议填CPU核数
        • 内存(MB):输入值最小为4096
      2. 单击“保存”保存配置,单击“提交”提交作业。

  5. 返回作业管理页面,可以查看到已创建的作业名称、类型、状态、作业种类和描述等信息。

    作业创建完成后,可在对应作业的“操作”列对作业进行启动、开发、停止、编辑、删除、查看作业详情和Checkpoint故障恢复等操作。

    • 如果要使用其他用户在节点上读取已提交的作业相关文件,需确保该用户与提交作业的用户具有相同的用户组和具有对应的FlinkServer应用管理权限角色,如参考创建FlinkServer权限角色勾选“应用查看”。
    • 作业状态为“运行中”的作业可以查看作业详情。
    • 作业状态为“运行失败”、“运行成功”和“停止”的作业可以进行Checkpoint故障恢复。
    • 作业状态为“失败”或“取消”的作业的Checkpoint是否保留可登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索并配置FlinkServer的参数“execution.checkpointing.externalized-checkpoint-retention”设置。
      • DELETE_ON_CANCELLATION:仅保存“失败”作业的Checkpoint。
      • RETAIN_ON_CANCELLATION(MRS 3.5.0及以后版本默认值):保存“失败”或“取消”作业的Checkpoint。
      • NO_EXTERNALIZED_CHECKPOINTS(MRS 3.5.0之前版本默认值):不保存“失败”或“取消”作业的Checkpoint。

相关文档