更新时间:2024-01-24 GMT+08:00

从零开始使用Flink

本节提供使用Flink运行wordcount作业的操作指导。

前提条件

  • MRS集群中已安装Flink组件。
  • 集群正常运行,已安装集群客户端,例如安装目录为“/opt/hadoopclient”。以下操作的客户端目录只是举例,请根据实际安装目录修改。

使用Flink客户端(MRS 3.x及之后版本)

  1. 安装客户端。

    以在集群内节点安装客户端为例:

    1. 登录Manager,在“集群”下拉列表中单击需要操作的集群名称,选择“更多 > 下载客户端”,弹出“下载集群客户端”信息提示框。
    2. 选择“完整客户端”,选择与待安装节点架构相匹配的平台类型,勾选“仅保存到如下路径”,单击“确定”开始生成客户端文件。
      • 文件生成后默认保存在主管理节点“/tmp/FusionInsight-Client”。
      • 客户端软件包名称格式为:“FusionInsight_Cluster_集群ID_Services_Client.tar”。本章节仅以集群ID为1进行介绍,请以实际集群ID为准。
    3. 以客户端安装用户登录将要安装客户端的服务器。
    4. 进入安装包所在目录,执行如下命令解压软件包。

      cd /tmp/FusionInsight-Client

      tar -xvf FusionInsight_Cluster_1_Services_Client.tar

    5. 执行如下命令校验解压得到的文件,检查回显信息与sha256文件里面的内容是否一致,例如:

      sha256sum -c FusionInsight_Cluster_1_Services_ClientConfig.tar.sha256

      FusionInsight_Cluster_1_Services_ClientConfig.tar: OK
    6. 解压获取的安装文件。

      tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar

    7. 进入安装包所在目录,执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/hadoopclient”目录。

      cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig

      ./install.sh /opt/hadoopclient

      等待客户端安装完成(以下只显示部分屏显结果)。

      The component client is installed successfully

  2. 以客户端安装用户,登录安装客户端的节点。
  3. 执行以下命令,切换到客户端安装目录。

    cd /opt/hadoopclient

  4. 执行如下命令初始化环境变量。

    source /opt/hadoopclient/bigdata_env

  5. 若集群开启Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。

    1. 准备一个提交Flink作业的用户。

      登录Manager,选择“系统 > 权限 > 角色”,单击“添加角色”,输入角色名称与描述。在“配置资源权限”的表格中选择“待操作集群的名称 > Flink”,勾选“FlinkServer管理操作权限”,单击“确定”,返回角色管理。

      选择“系统 > 权限 > 用户”,单击“添加用户”,输入用户名、密码等,用户类型选择“人机”,用户组根据需求添加“hadoop”、“yarnviewgroup”和“hadooppmanager”,并添加“System_administrator”、“default”和创建的角色,单击“确定”完成Flink作业用户创建(首次创建的用户需使用该用户登录Manager修改密码)。

    2. 登录Manager,下载认证凭据。

      登录集群的Manager界面,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本),选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。

      图1 下载认证凭据
    3. 将下载的认证凭据压缩包解压缩,并将得到的文件拷贝到客户端节点中,例如客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。如果是在集群外节点安装的客户端,需要将得到的文件拷贝到该节点的“/etc/”目录下。
    4. 将客户端安装节点的业务IP和所有Master节点IP添加到配置文件“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中的“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
      jobmanager.web.access-control-allow-origin: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx
      jobmanager.web.allow-access-address: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx
      客户端安装节点的业务IP获取方法:
      • 集群内节点:

        登录MapReduce服务管理控制台,选择“集群列表 > 现有集群”,选中当前的集群并单击集群名,进入集群信息页面。

        在“节点管理”中查看安装客户端所在的节点IP。

      • 集群外节点:安装客户端所在的弹性云服务器的IP。
    5. 配置安全认证,在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. 在Flink的客户端bin目录下,执行如下命令进行安全加固,并设置一个用于提交作业的密码。

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的值。

      执行认证和加密后会在Flink客户端的“conf”目录下生成“flink.keystore”和“flink.truststore”文件,并且在客户端配置文件“flink-conf.yaml”中将以下配置项进行了默认赋值:
      • 将配置项“security.ssl.keystore”设置为“flink.keystore”文件所在绝对路径。
      • 将配置项“security.ssl.truststore”设置为“flink.truststore”文件所在的绝对路径。
      • 将配置项“security.cookie”设置为“generate_keystore.sh”脚本自动生成的一串随机规则密码。
      • 默认“flink-conf.yaml”中“security.ssl.encrypt.enabled: false”,“generate_keystore.sh”脚本将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值设置为调用“generate_keystore.sh”脚本时输入的密码。配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。
      • MRS 3.x及之后版本,如果需要使用密文时,设置“flink-conf.yaml”中“security.ssl.encrypt.enabled: true”,“generate_keystore.sh”脚本不会配置“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值,需要使用Manager明文加密API进行获取,执行curl -k -i -u user name:password -X POST -HContent-type:application/json -d '{"plainText":"password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt'

        其中user name:password分别为当前系统登录用户名和密码;"plainText"的password为调用“generate_keystore.sh”脚本时的密码;x.x.x.x为集群Manager的浮动IP。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。

    7. 配置客户端访问flink.keystore和flink.truststore文件的路径。
      • 相对路径(推荐):
        执行如下步骤配置flink.keystore和flink.truststore文件路径为相对路径,并确保Flink Client执行命令的目录可以直接访问该相对路径。
        1. 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如ssl。

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. 移动flink.keystore和flink.truststore文件到“/opt/hadoopclient/Flink/flink/conf/ssl/”中。

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. 修改flink-conf.yaml文件中如下两个参数为相对路径。
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore
      • 绝对路径:

        执行“generate_keystore.sh”脚本后,在flink-conf.yaml文件中将flink.keystore和flink.truststore文件路径自动配置为绝对路径“/opt/hadoopclient/Flink/flink/conf/”,此时需要将conf目录中的flink.keystore和flink.truststore文件分别放置在Flink Client以及Yarn各个节点的该绝对路径上。

  6. 运行wordcount作业。

    用户在Flink提交作业或者运行作业时,需根据涉及的相关服务(如HDFS、Kafka等)是否启用Ranger鉴权,使该用户应具有如下权限:

    • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”的读写权限。
    • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
    • 普通集群(未开启Kerberos认证)可通过如下两种方式提交作业:
      • 执行如下命令启动session,并在session中提交作业。

        yarn-session.sh -nm "session-name" -d

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 执行如下命令在Yarn上提交单个作业。

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • 安全集群(开启Kerberos认证)根据flink.keystore和flink.truststore文件的路径有如下两种方式提交作业:
      • flink.keystore和flink.truststore文件路径为相对路径时:
        • 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业。

          其中“ssl”是相对路径,如“ssl”所在目录是“opt/hadoopclient/Flink/flink/conf/”,则在“opt/hadoopclient/Flink/flink/conf/”目录下执行命令。

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -t ssl/ -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          cd /opt/hadoopclient/Flink/flink/conf

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • flink.keystore和flink.truststore文件路径为绝对路径时:
        • 执行如下命令启动session,并在session中提交作业。

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

  7. 作业提交成功后,客户端界面显示如下。

    图2 在Yarn上提交作业成功
    图3 启动session成功
    图4 在session中提交作业成功

  8. 使用运行用户进入Yarn服务的原生页面,具体操作参考查看Flink作业信息,找到对应作业的application,单击application名称,进入到作业详情页面

    • 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
    • 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
      图5 application

使用Flink客户端(MRS 3.x之前版本)

  1. 安装客户端。

    以在Core节点安装客户端为例:

    1. 登录MRS Manager页面,选择“服务管理 > 下载客户端”下载客户端安装包至主管理节点。
    2. 使用IP地址搜索主管理节点并使用VNC登录主管理节点。
    3. 在主管理节点,执行以下命令切换用户。

      sudo su - omm

    4. 在MRS管理控制台,查看指定集群“节点管理”页面的“IP”地址。

      记录需使用客户端的Core节点IP地址。

    5. 在主管理节点,执行以下命令,将客户端安装包从主管理节点文件拷贝到当前Core节点:

      scp -p /tmp/MRS-client/MRS_Services_Client.tar Core节点的IP地址:/opt/client

    6. 使用“root”登录Core节点。

      Master节点支持Cloud-Init特性,Cloud-init预配置的用户名“root”,密码为创建集群时设置的密码。

    7. 执行以下命令,安装客户端:

      cd /opt/client

      tar -xvf MRS_Services_Client.tar

      tar -xvf MRS_Services_ClientConfig.tar

      cd /opt/client/MRS_Services_ClientConfig

      ./install.sh 客户端安装目录

      例如,执行命令:

      ./install.sh /opt/hadoopclient

  2. 以客户端安装用户,登录安装客户端的节点。
  3. 执行以下命令,切换到客户端安装目录。

    cd /opt/hadoopclient

  4. 执行如下命令初始化环境变量。

    source /opt/hadoopclient/bigdata_env

  5. 若集群开启Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。

    1. 准备一个提交Flink作业的用户。

      登录MRS Manager,选择“系统设置 > 角色管理 > 添加角色”,添加角色例如flinkrole。在“权限”的表格中选择“HDFS > File System > hdfs://hacluster/”,勾选“Read”、“Write”和“Execute”,单击“权限”表格中“服务”返回。选择“Yarn > Scheduler Queue > root”,勾选default的“Submit”,单击“确定”保存。

      选择“系统设置 > 用户组管理 > 添加用户组”,为样例工程创建一个用户组,例如flinkgroup。选择“系统设置 > 用户管理 > 添加用户”,为样例工程创建一个用户。填写用户名例如flinkuser,用户类型为“人机”用户,加入用户组flinkgrouphadoop,并绑定角色flinkrole取得权限,单击“确定”(首次创建的用户需使用该用户登录MRS Manager修改密码)。

    2. 登录Manager,下载认证凭据。

      登录集群的Manager界面,具体请参见访问MRS Manager(MRS 3.x之前版本),选择“系统设置 > 用户管理”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。

      图6 下载认证凭据
    3. 将下载的认证凭据压缩包解压缩,并将得到的文件拷贝到客户端节点中,例如客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。如果是在集群外节点安装的客户端,需要将得到的文件拷贝到该节点的“/etc/”目录下。
    4. 配置安全认证,在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    5. 参考签发证书样例章节生成“generate_keystore.sh”脚本并放置在Flink的客户端bin目录下,执行如下命令进行安全加固,并设置一个用于提交作业的密码。

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的值,针对MRS2.x及之前版本,安全集群默认没有开启外部SSL,用户如果需要启用外部SSL,请参考“认证和加密”章节进行配置后再次运行该脚本即可。

      • generate_keystore.sh脚本无需手动生成。
      • 执行认证和加密后会将生成的flink.keystore、flink.truststore、security.cookie自动填充到“flink-conf.yaml”对应配置项中。
    6. 配置客户端访问flink.keystore和flink.truststore文件的路径。
      • 相对路径(推荐):
        执行如下步骤配置flink.keystore和flink.truststore文件路径为相对路径,并确保Flink Client执行命令的目录可以直接访问该相对路径。
        1. 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如ssl。

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. 移动flink.keystore和flink.truststore文件到“/opt/hadoopclient/Flink/flink/conf/ssl/”中。

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. 修改flink-conf.yaml文件中如下两个参数为相对路径。
          security.ssl.internal.keystore: ssl/flink.keystore
          security.ssl.internal.truststore: ssl/flink.truststore
      • 绝对路径:

        执行“generate_keystore.sh”脚本后,在flink-conf.yaml文件中将flink.keystore和flink.truststore文件路径自动配置为绝对路径“/opt/hadoopclient/Flink/flink/conf/”,此时需要将conf目录中的flink.keystore和flink.truststore文件分别放置在Flink Client以及Yarn各个节点的该绝对路径上。

  6. 运行wordcount作业。

    用户在Flink提交作业或者运行作业时,需根据涉及的相关服务(如HDFS、Kafka等)是否启用Ranger鉴权,使该用户应具有如下权限:

    • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”的读写权限。
    • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
    • 普通集群(未开启Kerberos认证)可通过如下两种方式提交作业:
      • 执行如下命令启动session,并在session中提交作业。

        yarn-session.sh -nm "session-name" -d

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • 执行如下命令在Yarn上提交单个作业。

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • 安全集群(开启Kerberos认证)根据flink.keystore和flink.truststore文件的路径有如下两种方式提交作业:
      • flink.keystore和flink.truststore文件路径为相对路径时:
        • 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业。

          其中“ssl”是相对路径,如“ssl”所在目录是“opt/hadoopclient/Flink/flink/conf/”,则在“opt/hadoopclient/Flink/flink/conf/”目录下执行命令。

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -t ssl/ -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          cd /opt/hadoopclient/Flink/flink/conf

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • flink.keystore和flink.truststore文件路径为绝对路径时:
        • 执行如下命令启动session,并在session中提交作业。

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • 执行如下命令在Yarn上提交单个作业。

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

  7. 作业提交成功后,客户端界面显示如下。

    图7 在Yarn上提交作业成功
    图8 启动session成功
    图9 在session中提交作业成功

  8. 使用运行用户进入Yarn服务的原生页面,具体操作参考查看Flink作业信息,找到对应作业的application,单击application名称,进入到作业详情页面。

    • 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
    • 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
      图10 application