更新时间:2024-11-29 GMT+08:00

从零开始使用Flink

操作场景

本章节提供一个使用Flink运行wordcount作业的操作入门指导。

前提条件

  • MRS集群中已安装Flink组件且集群内各组件正常运行。
  • 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。

操作步骤

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令,切换到客户端安装目录。

    cd /opt/hadoopclient

  3. 执行如下命令初始化环境变量。

    source /opt/hadoopclient/bigdata_env

  4. 若集群开启了Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。

    1. 准备一个用于提交Flink作业的用户,例如test

      登录Manager,选择“系统 > 权限 > 角色”,单击“添加角色”,输入角色名称与描述。在“配置资源权限”的表格中选择“待操作集群的名称 > Flink”,勾选“FlinkServer管理操作权限”,单击“确定”,返回角色管理。

      选择“系统 > 权限 > 用户”,单击“添加用户”,输入用户名、密码等,用户类型选择“人机”,用户组根据需求添加“hadoop”、“yarnviewgroup”和“hadooppmanager”,并添加“System_administrator”、“default”和创建的角色,单击“确定”完成Flink作业用户创建(首次创建的用户需使用该用户登录Manager修改密码)。

      用户在Flink提交作业或者运行作业时,需根据涉及的相关服务(如HDFS、Kafka等)是否启用Ranger鉴权,使该用户应具有如下权限:

      • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”目录的读写权限。
      • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
    2. 登录FusionInsight Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”,下载用户对应的认证凭据文件到本地并解压。
    3. 将解压得到的“user.keytab”“krb5.conf”文件拷贝到客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
    4. 登录客户端节点,将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。

      vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml

    5. 配置安全认证。

      在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。

      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. 参考认证和加密进行安全加固配置,执行如下命令,并设置为一个用于提交作业的密码。

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的相关配置参数值。

    7. 配置客户端访问“flink.keystore”“flink.truststore”文件的路径配置。
      • 绝对路径

        执行“generate_keystore.sh”脚本后,默认在“flink-conf.yaml”文件中将“flink.keystore”“flink.truststore”文件路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”“flink.truststore”文件分别放置在Flink客户端以及Yarn各个节点的该绝对路径上。

      • 相对路径(推荐)
        请执行如下步骤配置“flink.keystore”“flink.truststore”文件路径为相对路径,并确保Flink客户端执行命令的目录可以直接访问该相对路径。
        1. 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. 移动“flink.keystore”“flink.truststore”文件到新建目录中。

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
          vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore

  5. 运行wordcount作业。

    作业提交有如下模式:

    • Session模式

      该模式会在“客户端安装路径/Flink/tmp/.yarn-properties-<username>”中创建一个YARN属性文件,在提交作业时,会将作业提交到该文件所记录的applicationID上。作业结束后Flink集群不会关闭,Session模式下分如下两种模式提交作业:

      • attached模式(默认)

        yarn-session.sh客户端将Flink集群提交给YARN,但客户端一直在运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,会向集群发出关闭信号。

      • detached模式(-d或--detached)

        yarn-session.sh客户端将Flink集群提交给YARN,然后客户端返回。需要再次调用客户端或YARN工具来关闭Flink集群。

    • Application模式

      该模式会在YARN上启动一个Flink集群,其中应用程序jar的main()方法在YARN中的JobManager上执行,可使用HDFS上的依赖包。应用程序完成后,集群将立即关闭。也可以使用yarn application -kill <ApplicationId>或通过取消Flink作业手动停止集群,作业结束后Flink集群也会关闭。

    • Per-Job Cluster模式

      该模式会在YARN上启动一个Flink集群,然后在本地运行提供的应用程序jar,最后将JobGraph提交给YARN上的JobManager。如果使用--detached参数,客户端将在作业提交后停止,作业结束后Flink集群也会关闭。

    • yarn-cluster模式

      和Per-Job Cluster模式类似。

    • 若开启了作业注册功能,即“/opt/client/Flink/flink/conf/flink-conf.yaml”文件中参数配置为“job.alarm.enable: true”、“job.register.enable: true”和“flinkServer.tenant.name: CLIENT_APP”时,运行作业命令中需要指定作业名,即添加参数“-Dyarn.application.name=作业名”。
    • 普通集群(未开启Kerberos认证)
      • 使用Session的attached模式提交作业

        yarn-session.sh -nm "session-name"

        重新打开一个客户端连接,提交作业:

        source /opt/hadoopclient/bigdata_env

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 使用Application模式提交作业

        flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 使用Per-job模式提交作业

        flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 使用yarn-cluster模式提交作业

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • 安全集群(开启Kerberos认证)
      • “flink.keystore”“flink.truststore”文件路径为相对路径时:
        • 使用Session的attached模式提交作业,其中“ssl/”是相对路径

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -t ssl/ -nm "session-name"

          ...
          Cluster started: Yarn cluster with application id application_1624937999496_0017
          JobManager Web Interface: http://192.168.1.150:32261

          重新打开一个客户端连接,提交作业:

          source /opt/hadoopclient/bigdata_env

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Job has been submitted with JobID 587d5498fff18d8b2501fdf7ebb9c4fb
          Program execution finished
          Job with JobID 587d5498fff18d8b2501fdf7ebb9c4fb has finished.
          Job Runtime: 19917 ms
        • 使用Application模式提交作业

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run-application -t yarn-application -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Submitted application application_1669179911005_0070
          Waiting for the cluster to be allocated
          Deploying cluster, current state ACCEPTED
          YARN application has been deployed successfully.
          Found Web Interface xxx:xxx of application 'application_1669179911005_0070'.
        • 使用Per-job模式提交作业

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run -t yarn-per-job -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Cluster started: Yarn cluster with application id application_1669179911005_0071
          Job has been submitted with JobID 75011429a29f230121809f54f4570ed0
          Program execution finished
          Job with JobID 75011429a29f230121809f54f4570ed0 has finished.
          Job Runtime: 21245 ms
        • 使用yarn-cluster模式提交作业

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • “flink.keystore”“flink.truststore”文件路径为绝对路径时:
        • 使用Session的attached模式提交作业

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -nm "session-name"

          重新打开一个客户端连接,提交作业:

          source /opt/hadoopclient/bigdata_env

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          或者:flink run -t yarn-session -Dyarn.application.id=application_xxx /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 使用Application模式提交作业

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 使用Per-job模式提交作业

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 使用yarn-cluster模式提交作业

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    通过客户端注册到FlinkServer的作业,若未开启作业注册到FlinkServer功能,暂不支持在FlinkServer WebUI执行启动、开发、停止等操作。可参考Flink作业级巡检能力开启作业注册到FlinkServer功能。

  6. 使用运行用户登录FusionInsight Manager,进入Yarn服务的原生页面,找到对应作业的application,单击application名称,进入到作业详情页面。

    • 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
    • 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
      图1 application