更新时间:2024-06-12 GMT+08:00
分享

在Flink WebUI和Linux环境中调测Flink应用

操作场景

IoTDB应用程序支持在安装了Flink客户端的Linux环境和安装了Flink WebUI的环境中运行。在程序代码完成开发后,您可以上传Jar包至准备好的环境中运行。

前提条件

  • 集群已安装Flink组件,并且添加了FlinkServer实例。
  • 已安装包含Flink服务的集群客户端,例如安装路径为“/opt/client”。
  • 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。

操作步骤

  1. 构建Jar包。

    • 在IntelliJ IDEA中,在生成Jar包之前配置工程的Artifacts信息。
      1. 在IDEA主页面,选择File > Project Structures...进入“Project Structure”页面。
      2. “Project Structure”页面,选择“Artifacts”,单击“+”并选择“From modules with dependencies...”。
        图1 添加Artifacts
      3. 选择“extract to the target JAR”后,单击“OK”。

      4. 您可以根据实际情况设置Jar包的名称、类型以及输出路径。

        为避免导入不必要Jar包引起Jar包冲突,您只需要加载IoTDB相关的基础Jar包,分别是:

        • flink-iotdb-connector-*
        • flink-tsfile-connector-*
        • hdoop-tsfile-*
        • influxdb-thrift-*
        • iotdb-antlr-
        • iotdb-session-*
        • iotdb-thrift-*
        • iotdb-thrift-commons-*
        • isession-*
        • libthrift-*
        • iotdb-session-*
        • iotdb-thrift-*
        • service-rpc-*
        • tsfile-*

        单击“OK”。

      5. 在IDEA主页面,选择Build > Build Artifacts...进入“Build Artifact”页面,选择“Action > Build”。

      6. 编译成功后,右下角会有“Build completed successfully”,相应的Jar包会在“Output Directory”目录中产生。

  2. 运行Flink作业。

    • 场景一:使用Flink WebUI界面运行Flink作业
      1. 使用具有FlinkServer WebUI管理权限的用户登录集群的FusionInsight Manager,选择“集群 > 服务 > Flink”,单击概览页面“Flink WebUI”后的超链接地址进入FlinkServer WebUI界面。
      2. 进入FusionInsight Flink WebUI后,选择当前应用下的“作业管理 > 新建作业”,创建作业。

      3. 选择“类型”为“Flink Jar”,输入待创建的作业名称,选择作业类型,单击“确定”开始进行作业配置。

      4. 上传1生成的Jar包,“Main Class”选择“指定”,并在下面的类名填写要执行的类,然后单击“提交”。

        例如“com.huawei.bigdata.iotdb.FlinkIoTDBSink”为执行FlinkIoTDBSink样例程序的开发程序,或“com.huawei.bigdata.iotdb.FlinkIoTDBSource”为执行FlinkIoTDBSource样例程序的开发程序。。

    • 场景二:使用Flink客户端在Linux环境下提交Flink作业
      1. 以客户端安装用户登录MRS客户端节点。
      2. 执行如下命令初始化环境变量。

        source /opt/client/bigdata_env

      3. 若集群开启Kerberos认证,需要执行2.d-2.k,若集群未开启Kerberos认证请跳过步骤。
      4. 准备一个提交Flink作业的用户。

        具体请参考准备MRS应用开发用户

      5. 使用新创建的用户登录Manager页面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
      6. 将下载的认证凭据压缩包解压缩,并将得到的“user.keytab”文件复制到客户端节点中,例如客户端节点的“/opt/client/Flink/flink/conf”目录下。
      7. 安全模式下需要将客户端安装节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/client/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
      8. 配置安全认证,在“/opt/client/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
        security.kerberos.login.keytab: <user.keytab文件路径>
        security.kerberos.login.principal: <用户名>

        例如:

        security.kerberos.login.keytab: /opt/client/Flink/flink/conf/user.keytab
        security.kerberos.login.principal: test
      9. 在Flink的客户端“bin”目录下,执行如下命令进行安全加固,请参考认证和加密

        sh generate_keystore.sh

        该脚本执行后需输入一个用于提交作业的密码,然后自动替换“/opt/client/Flink/flink/conf/flink-conf.yaml”中关于SSL的值。

        • 执行认证和加密后会将生成的flink.keystore、flink.truststore、security.cookie自动填充到“flink-conf.yaml”对应配置项中。
        • “security.ssl.key-password”“security.ssl.keystore-password”“security.ssl.truststore-password”的值需要使用Manager明文加密API进行获取:

          curl -k -i -u <user name>:<password> -X POST -HContent-type:application/json -d '{"plainText":"<password>"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中<password>要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。

      10. 根据客户端访问“flink.keystore”“flink.truststore”文件的路径配置。
        • 绝对路径:执行该脚本后,在“flink-conf.yaml”文件中将“flink.keystore”“flink.truststore”文件的路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”“flink.truststore”文件分别放置在Flink客户端以及集群内各个Yarn节点上的该绝对路径上。
        • 相对路径:执行如下步骤配置“flink.keystore”“flink.truststore”文件路径为相对路径。
          1. 在“/opt/client/Flink/flink/conf/”目录下新建目录,例如ssl。

            cd /opt/client/Flink/flink/conf

            mkdir ssl

          2. 移动“flink.keystore”“flink.truststore”文件到新建的文件夹中。

            mv flink.keystore flink.truststore ssl/

          3. 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
            security.ssl.keystore: ssl/flink.keystore
            security.ssl.truststore: ssl/flink.truststore
      11. “flink-conf.yaml”配置文件如下配置项中追加客户端所在节点的IP,IP地址之间使用英文逗号分隔。
        web.access-control-allow-origin: xx.xx.xxx.xxx
        jobmanager.web.allow-access-address: xx.xx.xxx.xxx
      12. 1生成的Jar包上传至Flink客户端节点,如“/opt/client/Flink/flink”,然后提交作业。

        用户在Flink提交作业或者运行作业时,应具有如下权限:

        • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”的读写权限。
        • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
        • flink.keystore和flink.truststore文件路径为绝对路径时:

          执行如下命令启动session,并在session中提交作业。其中“com.huawei.bigdata.iotdb.FlinkIoTDBSink”执行FlinkIoTDBSink样例程序的开发程序。

          yarn-session.sh -nm "session-name"

          flink run --class com.huawei.bigdata.iotdb.FlinkIoTDBSink /opt/client/Flink/flink/flink-example.jar

        • flink.keystore和flink.truststore文件路径为相对路径时:

          在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl”是相对路径,如“ssl”所在目录是“/opt/client/Flink/flink/conf/”,则在该目录下执行命令。

          其中“com.huawei.bigdata.iotdb.FlinkIoTDBSink”执行的FlinkIoTDBSink样例程序开发程序。

          yarn-session.sh -t ssl/ -nm "session-name"

          flink run --class com.huawei.bigdata.iotdb.FlinkIoTDBSink /opt/client/Flink/flink/flink-example.jar

查看调测结果

  1. 查看作业是否执行成功:
    • 使用Flink WebUI

      Flink Server WebUI上状态返回运行成功,则执行成功,详细日志可以通过“操作 > 更多 > 作业详情”查看。

      图2 在Flink WebUI查看运行结果
    • 使用Flink客户端

      使用运行用户登录FusionInsight Manager,进入Yarn服务的原生页面,找到对应作业的Application,单击Application名称,进入到作业详情页面。

      • 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
      • 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
        图3 在Yarn WebUI查看作业信息
  2. 验证作业执行结果:
    • FlinkIoTDBSink执行结果验证:
      • 在IoTDB客户端执行以下命令,查看数据是否已经从Flink写入到IoTDB中。

        select * from root.sg.d1

        图4 查看写入的数据内容
      • FlinkIoTDBSource执行结果验证:
        1. 使用运行用户登录FusionInsight Manager,选择“集群 > 服务 > HDFS”,单击“NameNode WebUI”右侧的超链接进入HDFS WebUI界面。
        2. 选择“Utilities > Browse the file system”。
          图5 进入“Browse the file system”页面
        3. 进入“/tmp/logs/执行用户名/bucket-logs-tfile/任务ID/Flink任务ID”目录,将该目录下文件全部下载到本地。
          图6 获取作业日志文件
        4. 2.c下载的文件中搜索“root.sg.d1”,如下图显示则表示数据从IoTDB中读取成功。
          图7 读取IoTDB数据成功

相关文档