更新时间:2024-05-27 GMT+08:00

编译和运行程序

操作场景

IoTDB应用程序支持在安装了Flink客户端的Linux环境和安装了Flink WebUI的环境中运行。在程序代码完成开发后,您可以上传Jar包至准备好的环境中运行。

前提条件

  • 集群已安装Flink组件,并且添加了FlinkServer实例。
  • 已安装包含Flink服务的集群客户端,例如安装路径为“/opt/client”。
  • 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。

操作步骤

  1. 构建Jar包。

    • 在IntelliJ IDEA中,在生成Jar包之前配置工程的Artifacts信息。
      1. 在IDEA主页面,选择File > Project Structures...进入“Project Structure”页面。
      2. “Project Structure”页面,选择“Artifacts”,单击“+”并选择“From modules with dependencies...”。
        图1 添加Artifacts
      3. 选择“extract to the target JAR”后,单击“OK”。

      4. 您可以根据实际情况设置Jar包的名称、类型以及输出路径。

        为避免导入不必要Jar包引起Jar包冲突,您只需要加载IoTDB相关的基础Jar包,分别是:

        • flink-iotdb-connector-*
        • flink-tsfile-connector-*
        • hdoop-tsfile-*
        • influxdb-thrift-*
        • iotdb-antlr-
        • iotdb-session-*
        • iotdb-thrift-*
        • iotdb-thrift-commons-*
        • isession-*
        • libthrift-*
        • iotdb-session-*
        • iotdb-thrift-*
        • service-rpc-*
        • tsfile-*

        单击“OK”。

      5. 在IDEA主页面,选择Build > Build Artifacts...进入“Build Artifact”页面,选择“Action > Build”。

      6. 编译成功后,右下角会有“Build completed successfully”,相应的Jar包会在“Output Directory”目录中产生。

  2. (场景1)使用Flink WebUI界面运行Flink作业。

    1. 使用具有FlinkServer WebUI管理权限的用户登录集群的FusionInsight Manager,单击“集群 > 服务 > Flink”,单击概览页面“Flink WebUI”后的超链接地址进入FlinkServer WebUI界面。
    2. 进入FusionInsight Flink WebUI后,选择当前应用下的“作业管理 > 新建作业”,创建作业。

    3. 选择“类型”为“Flink Jar”,输入待创建的作业名称,选择作业类型,单击“确定”开始进行作业配置。

    4. 上传1生成的Jar包,“Main Class”选择“指定”,并在下面的类名填写要执行的类,然后单击“提交”。

      例如“com.huawei.bigdata.iotdb.FlinkIoTDBSink”为执行FlinkIoTDBSink的开发程序,或“com.huawei.bigdata.iotdb.FlinkIoTDBSource”为执行FlinkIoTDBSource的开发程序。。

  3. (场景2)使用Flink客户端在Linux环境下提交Flink作业。

    1. 以客户端安装用户登录MRS客户端节点。
    2. 执行如下命令初始化环境变量。

      source /opt/client/bigdata_env

    3. 若集群开启Kerberos认证,需要执行3.d-3.k,若集群未开启Kerberos认证请跳过步骤。
    4. 准备一个提交Flink作业的用户。

      具体请参考准备MRS应用开发用户

    5. 使用新创建的用户登录Manager页面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
    6. 将下载的认证凭据压缩包解压缩,并将得到的“user.keytab”文件拷贝到客户端节点中,例如客户端节点的“/opt/client/Flink/flink/conf”目录下。
    7. 安全模式下需要将客户端安装节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/client/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
    8. 配置安全认证,在“/opt/client/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
      security.kerberos.login.keytab: <user.keytab文件路径>
      security.kerberos.login.principal: <用户名>

      例如:

      security.kerberos.login.keytab: /opt/client/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    9. 在Flink的客户端“bin”目录下,执行如下命令进行安全加固,请参考认证和加密

      sh generate_keystore.sh

      该脚本执行后需输入一个用于提交作业的密码,然后自动替换“/opt/client/Flink/flink/conf/flink-conf.yaml”中关于SSL的值。

      • 执行认证和加密后会将生成的flink.keystore、flink.truststore、security.cookie自动填充到“flink-conf.yaml”对应配置项中。
      • “security.ssl.key-password”“security.ssl.keystore-password”“security.ssl.truststore-password”的值需要使用Manager明文加密API进行获取:

        curl -k -i -u <user name>:<password> -X POST -HContent-type:application/json -d '{"plainText":"<password>"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中<password>要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。

    10. 根据客户端访问“flink.keystore”“flink.truststore”文件的路径配置。
      • 绝对路径:执行该脚本后,在“flink-conf.yaml”文件中将“flink.keystore”“flink.truststore”文件的路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”“flink.truststore”文件分别放置在Flink客户端以及集群内各个Yarn节点上的该绝对路径上。
      • 相对路径:执行如下步骤配置“flink.keystore”“flink.truststore”文件路径为相对路径。
        1. 在“/opt/client/Flink/flink/conf/”目录下新建目录,例如ssl。

          cd /opt/client/Flink/flink/conf

          mkdir ssl

        2. 移动“flink.keystore”“flink.truststore”文件到新建的文件夹中。

          mv flink.keystore flink.truststore ssl/

        3. 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore
    11. “flink-conf.yaml”配置文件如下配置项中追加客户端所在节点的IP,IP地址之间使用英文逗号分隔。
      web.access-control-allow-origin: xx.xx.xxx.xxx
      jobmanager.web.allow-access-address: xx.xx.xxx.xxx
    12. 1生成的Jar包上传至Flink客户端节点,如“/opt/client/Flink/flink”,然后提交作业。

      用户在Flink提交作业或者运行作业时,应具有如下权限:

      • 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”的读写权限。
      • 如果停用Ranger鉴权,当前用户必须属于hadoop组。
      • flink.keystore和flink.truststore文件路径为绝对路径时:

        执行如下命令启动session,并在session中提交作业。其中“com.huawei.bigdata.iotdb.FlinkIoTDBSink”执行FlinkIoTDBSink的开发程序。

        yarn-session.sh -nm "session-name"

        flink run --class com.huawei.bigdata.iotdb.FlinkIoTDBSink /opt/client/Flink/flink/flink-example.jar

      • flink.keystore和flink.truststore文件路径为相对路径时:

        在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl”是相对路径,如“ssl”所在目录是“/opt/client/Flink/flink/conf/”,则在该目录下执行命令。

        其中“com.huawei.bigdata.iotdb.FlinkIoTDBSink”执行的FlinkIoTDBSink开发程序。

        yarn-session.sh -t ssl/ -nm "session-name"

        flink run --class com.huawei.bigdata.iotdb.FlinkIoTDBSink /opt/client/Flink/flink/flink-example.jar