更新时间:2024-09-04 GMT+08:00

运行Flink作业

用户可将自己开发的程序提交到MRS中,执行程序并获取结果,本章节指导您如何在MRS集群中提交一个Flink作业。

Flink作业用于提交jar程序处理流式数据。

用户可以在MRS管理控制台在线创建一个作业并提交运行,也可以通过MRS集群客户端来以命令行形式提交作业。

操作视频

本视频以在未开启Kerberos认证的MRS 3.1.0版本集群的管理控制台上,提交用于处理OBS存储的数据的Flink作业为例进行说明。

因不同版本操作界面可能存在差异,相关视频供参考,具体以实际环境为准。

前提条件

  • 用户已经将作业所需的程序包和数据文件上传至OBS或HDFS文件系统中。
  • 如果作业程序需要读取以及分析OBS文件系统中的数据,需要先配置MRS集群的存算分离,请参考配置MRS集群存算分离

通过管理控制台提交作业

  1. 登录MRS管理控制台。
  2. 选择“现有集群”,选中一个运行中的集群并单击集群名称,进入集群信息页面。
  3. “概览”页签的基本信息区域,单击“IAM用户同步”右侧的“同步”进行IAM用户同步。

    集群开启Kerberos认证时需执行该步骤,若集群未开启Kerberos认证,无需执行本步骤。

    • IAM用户同步完成后,请等待5分钟,再进行提交作业,更多IAM用户同步说明请参考IAM用户同步MRS集群说明
    • 当IAM用户的用户组的所属策略从MRS ReadOnlyAccess向MRS CommonOperations、MRS FullAccess、MRS Administrator变化时,或者反之从MRS CommonOperations、MRS FullAccess、MRS Administrator向MRS ReadOnlyAccess变化时,由于集群节点的SSSD(System Security Services Daemon)缓存刷新需要时间,因此用户同步完成后,请等待5分钟,待新策略生效之后,再进行提交作业,否则会出现提交作业失败的情况。
    • 当前IAM用户名中存在空格时(例如admin 01),不支持添加作业。

  4. 单击“作业管理”,在作业列表界面单击“添加”。
  5. “作业类型”选择“Flink”并参考表1配置Flink作业信息。

    图1 添加Flink作业
    表1 作业配置信息

    参数

    描述

    示例

    作业名称

    作业名称,只能由字母、数字、中划线和下划线组成,并且长度为1~64个字符。

    flink_job

    执行程序路径

    待执行程序包地址,可直接手动输入地址路径,也可单击“HDFS”或者“OBS”选择文件。

    • 最多为1023字符,不能包含;|&>,<'$特殊字符,且不可为空或全空格。
    • OBS程序路径地址以“obs://”开头,例如“obs://wordcount/program/XXX.jar”。HDFS程序路径地址以“hdfs://”开头,例如“hdfs://hacluster/user/XXX.jar”。
    • Flink作业执行程序需要以“.jar”结尾。

    -

    运行程序参数

    可选参数,为本次执行的作业配置相关优化参数(例如线程、内存、CPU核数等),用于优化资源使用效率,提升作业的执行性能。

    Flink作业常用运行程序参数如表2所示,可根据执行程序及集群资源情况进行配置。

    -

    执行程序参数

    可选参数,程序执行的关键参数,该参数由用户程序内的函数指定,MRS只负责参数的传入。

    多个参数间使用空格隔开,最多为150000字符,不能包含;|&><'$特殊字符,可为空。

    注意:

    用户输入带有敏感信息(如登录密码)的参数时,可通过在参数名前添加“@”的方式为该参数值加密,以防止敏感信息被明文形式持久化。

    在MRS管理控制台查看作业信息时,敏感信息会显示为“*”。

    例如:username=testuser @password=用户密码

    -

    服务配置参数

    可选参数,用于为本次执行的作业修改服务配置参数。

    该参数的修改仅适用于本次执行的作业,如需对集群永久生效,请参考修改MRS集群组件配置参数进行集群组件配置参数的修改。

    如需添加多个参数,请单击右侧增加。

    例如作业需要通过AK/SK方式访问OBS,增加以下服务配置参数:

    • fs.obs.access.key:访问OBS的密钥ID。
    • fs.obs.secret.key:访问OBS与密钥ID对应的密钥。

    -

    命令参考

    用于展示提交作业时提交到后台执行的命令。

    -

    表2 Flink作业运行程序参数

    参数

    描述

    示例

    -ytm

    设置每个TaskManager容器的内存(单位可选,默认单位MB)。

    1024

    -yjm

    设置JobManager容器内存(单位可选,默认单位MB)。

    1024

    -ys

    设置TaskManager的核数。

    2

    -ynm

    自定义Yarn上应用程序名称。

    test

    -c

    设置程序入口点的类名(如“main”或“getPlan()”方法)。该参数仅在jar程序未指定其清单的类时需要。

    com.bigdata.mrs.test

  6. 确认作业配置信息,单击“确定”,完成作业的新增。
  7. 作业提交成功中,可在作业列表中查看作业运行状态及执行结果,等待作业状态变为“已完成”,可查看相关程序分析结果。

通过集群客户端提交作业

  1. 安装MRS集群客户端,具体操作可参考安装MRS集群客户端

    MRS集群中默认安装了一个客户端用于作业提交,也可直接使用该客户端。MRS 3.x及之后版本客户端默认安装路径为Master节点上的“/opt/Bigdata/client”,MRS 3.x之前版本为Master节点上的“/opt/client”。

  2. 使用MRS集群客户端安装用户登录客户端所在的节点。
  3. 执行如下命令初始化环境变量。

    cd /opt/Bigdata/client

    source bigdata_env

  4. 若集群开启Kerberos认证,需要执行以下步骤创建用于提交作业的用户并修改集群客户端相关安全配置,若集群未开启Kerberos认证请跳过该步骤。

    1. 准备一个提交Flink作业的用户。
    2. 使用新创建的用户登录Manager页面。
      • MRS 3.x之前版本,登录集群的Manager界面,选择“系统设置 > 用户管理”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
      • MRS 3.x及之后版本,登录集群的Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
    3. 将下载的认证凭据压缩包解压缩,并将得到的文件复制到客户端节点中,例如客户端节点的“/opt/Bigdata/client/Flink/flink/conf”目录下。如果是在集群外节点安装的客户端,需要将得到的文件复制到该节点的“/etc/”目录下。
    4. MRS 3.x及之后版本,安全模式下需要将客户端安装节点的业务IP以及Manager的浮动ip追加到“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,ip之间使用英文逗号分隔。
    5. 配置安全认证,在“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/Bigdata/client/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. 在Flink的客户端bin目录下,执行如下命令进行安全加固,并设置一个用于提交作业的密码。

      sh generate_keystore.sh

      该脚本会自动替换“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”中关于SSL的值,针对MRS 3.x之前版本,安全集群默认没有开启外部SSL,用户如果需要启用外部SSL,进行配置后再次运行该脚本即可,配置参数在MRS的Flink默认配置中不存在,用户如果开启外部连接SSL,则需要添加表3中参数。

      表3 参数描述

      参数

      描述

      参数值示例

      security.ssl.rest.enabled

      打开外部SSL开关。

      true

      security.ssl.rest.keystore

      keystore的存放路径。

      ${path}/flink.keystore

      security.ssl.rest.keystore-password

      keystore的password,“123456”表示需要用户输入自定义设置的密码值。

      123456

      security.ssl.rest.key-password

      ssl key的password,“123456”表示需要用户输入自定义设置的密码值。

      123456

      security.ssl.rest.truststore

      truststore存放路径。

      ${path}/flink.truststore

      security.ssl.rest.truststore-password

      truststore的password,“123456”表示需要用户输入自定义设置的密码值。

      123456

      • 针对MRS 3.x之前版本,generate_keystore.sh脚本无需手动生成。
      • 认证和加密会将生成的flink.keystore、flink.truststore、security.cookie自动填充到“flink-conf.yaml”对应配置项中。
      • 针对MRS 3.x及之后版本,“security.ssl.key-password”“security.ssl.keystore-password”“security.ssl.truststore-password”的值需要使用Manager明文加密API进行获取:

        curl -k -i -u <user name>:<password> -X POST -HContent-type:application/json -d '{"plainText":"<password>"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中<password>要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。

        命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。

    7. 客户端访问flink.keystore和flink.truststore文件的路径配置。
      • 绝对路径:执行该脚本后,在flink-conf.yaml文件中将flink.keystore和flink.truststore文件路径自动配置为绝对路径“/opt/Bigdata/client/Flink/flink/conf/”,此时需要将conf目录中的flink.keystore和flink.truststore文件分别放置在Flink Client以及Yarn各个节点的该绝对路径上。
      • 相对路径:请执行如下步骤配置flink.keystore和flink.truststore文件路径为相对路径,并确保Flink Client执行命令的目录可以直接访问该相对路径。
        1. 在“/opt/Bigdata/client/Flink/flink/conf/”目录下新建目录,例如ssl。
        2. 移动flink.keystore和flink.truststore文件到“/opt/Bigdata/client/Flink/flink/conf/ssl/”中。
        3. 针对MRS 3.x及之后版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore
        4. 针对MRS 3.x之前版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
          security.ssl.internal.keystore: ssl/flink.keystore
          security.ssl.internal.truststore: ssl/flink.truststore
    8. 如果客户端安装在集群外节点,请在配置文件(如:“/opt/Bigdata/client/Flink/fink/conf/flink-conf.yaml”)中增加如下配置值,其中xx.xx.xxx.xxx请替换为客户端所在节点的IP。
      web.access-control-allow-origin: xx.xx.xxx.xxx
      jobmanager.web.allow-access-address: xx.xx.xxx.xxx

  5. 运行Flink作业。

    本章节以提交客户端自带的WordCount样例程序为例。

    • 普通集群(未开启Kerberos认证)
      • 执行如下命令启动session,并在session中提交作业。

        yarn-session.sh -nm "session-name" -d

        flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar

      • 执行如下命令在Yarn上提交单个作业。

        flink run -m yarn-cluster /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar

    • 安全集群(开启Kerberos认证)
      • flink.keystore和flink.truststore文件路径为绝对路径时:
        • 执行如下命令启动session,并在session中提交作业。

          yarn-session.sh -nm "session-name" -d

          flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          flink run -m yarn-cluster /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar

      • flink.keystore和flink.truststore文件路径为相对路径时:
        • 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl”是相对路径,如“ssl”所在目录是“opt/Bigdata/client/Flink/flink/conf/”,则在“opt/Bigdata/client/Flink/flink/conf/”目录下执行命令。

          yarn-session.sh -t ssl/ -nm "session-name" -d

          flink run /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          flink run -m yarn-cluster -yt ssl/ /opt/Bigdata/client/Flink/flink/examples/streaming/WordCount.jar