更新时间:2022-12-14 GMT+08:00

从零开始使用Flink

操作场景

本章节提供一个使用Flink运行wordcount作业的操作入门指导。

前提条件

  • MRS集群中已安装Flink组件且集群内各组件正常运行。
  • 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。

操作步骤

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令,切换到客户端安装目录。

    cd /opt/hadoopclient

  3. 执行如下命令初始化环境变量。

    source /opt/hadoopclient/bigdata_env

  4. 若集群开启了Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。

    1. 准备一个用于提交Flink作业的用户,例如test

      人机用户创建后,使用新用户登录FusionInsight Manager并根据界面提示修改初始密码。

      用户在Flink提交作业或者运行作业时,应具有如下权限:

      • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”目录的读写权限。
      • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
    2. 登录FusionInsight Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”,下载用户对应的认证凭据文件到本地并解压。
    3. 将解压得到的“user.keytab”“krb5.conf”文件拷贝到客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
    4. 登录客户端节点,将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。

      vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml

    5. 配置安全认证。

      在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。

      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. 参考认证和加密进行安全加固配置,执行如下命令,并设置为一个用于提交作业的密码。

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的相关配置参数值。

    7. 配置客户端访问“flink.keystore”“flink.truststore”文件的路径配置。
      • 绝对路径

        执行“generate_keystore.sh”脚本后,默认在“flink-conf.yaml”文件中将“flink.keystore”“flink.truststore”文件路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”“flink.truststore”文件分别放置在Flink客户端以及Yarn各个节点的该绝对路径上。

      • 相对路径(推荐)
        请执行如下步骤配置“flink.keystore”“flink.truststore”文件路径为相对路径,并确保Flink客户端执行命令的目录可以直接访问该相对路径。
        1. 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. 移动“flink.keystore”“flink.truststore”文件到新建目录中。

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
          vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore

  5. 运行wordcount作业。

    • 普通集群(未开启Kerberos认证)
      • 执行如下命令启动session,并在session中提交作业。

        yarn-session.sh -nm "session-name"

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 执行如下命令在Yarn上提交单个作业。

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • 安全集群(开启Kerberos认证)
      • “flink.keystore”“flink.truststore”文件路径为相对路径时:
        • 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl/”是相对路径。

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -t ssl/ -nm "session-name"

          ...
          Cluster started: Yarn cluster with application id application_1624937999496_0017
          JobManager Web Interface: http://192.168.1.150:32261
          

          重新打开一个客户端连接,提交作业:

          source /opt/hadoopclient/bigdata_env

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Job has been submitted with JobID 587d5498fff18d8b2501fdf7ebb9c4fb
          Program execution finished
          Job with JobID 587d5498fff18d8b2501fdf7ebb9c4fb has finished.
          Job Runtime: 19917 ms
        • 执行如下命令在Yarn上提交单个作业。

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Cluster started: Yarn cluster with application id application_1624937999496_0016
          Job has been submitted with JobID e9c59fb48f44feae7b62dd90336d6d7f
          Program execution finished
          Job with JobID e9c59fb48f44feae7b62dd90336d6d7f has finished.
          Job Runtime: 18155 ms
      • “flink.keystore”“flink.truststore”文件路径为绝对路径时:
        • 执行如下命令启动session,并在session中提交作业。

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -nm "session-name"

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

  6. 使用运行用户登录FusionInsight Manager,进入Yarn服务的原生页面,找到对应作业的application,单击application名称,进入到作业详情页面。

    • 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
    • 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
      图1 application