更新时间:2024-10-28 GMT+08:00

创建FlinkServer作业写入数据至HBase表

本章节适用于MRS 3.1.2及之后的版本。

操作场景

FlinkServer支持对接HBase,详情如下:

  • 支持对接维表、Sink表。
  • 当HBase与Flink为同一集群或互信的集群,支持FlinkServer对接HBase。
  • 当HBase与Flink不在同一集群或不互信的集群,则只支持Flink和HBase均为普通模式集群的对接。

前提条件

  • 集群已安装,包括HDFS、Yarn、Flink和HBase。
  • 包含HBase服务的客户端已安装,安装路径如:/opt/client

创建作业步骤

场景一:HBase作为Sink表。

  1. 在HBase客户端建表。

    参考HBase客户端使用实践,登录HBase客户端,使用create 'dim_province', "f1"创建dim_province表。

  2. 拷贝HBase配置文件至FlinkServer所在节点。

    1. 以客户端安装用户登录安装客户端的节点,拷贝HBase的“/opt/client/HBase/hbase/conf/”目录下的所有配置文件至部署FlinkServer的所有节点的一个空目录,如“/tmp/client/HBase/hbase/conf/”。
    2. 修改FlinkServer节点上面配置文件目录及其上层目录属主为omm。
      chown omm: /tmp/client/HBase/ -R
      • FlinkServer节点:

        登录Manager,选择“集群 > 服务 > Flink > 实例”,查看FlinkServer所在的“业务IP”。

      • 若FlinkServer实例所在节点与包含HBase服务客户端的安装节点相同,则该节点不执行此步骤。

  3. 添加FlinkServer本地访问HBase集群路径。

    登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索“HBASE_CONF_DIR”参数,在该参数的“值”中填写上一步中拷贝了HBase配置文件的FlinkServer的目录,如“/tmp/client/HBase/hbase/conf/”。填写完成后单击“保存”,确认修改配置后单击“确定”。

    若FlinkServer实例所在节点与包含HBase服务客户端的安装节点相同,则在“HBASE_CONF_DIR”参数的“值”填写HBase的“/opt/client/HBase/hbase/conf/”目录。

  4. 重启受影响的FlinkServer实例。

    单击“实例”,勾选所有FlinkServer实例,选择“更多 > 重启实例”,输入密码,单击“确定”重启实例。

  5. 访问FlinkServer,并创建Flink SQL作业。

    1. 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
    2. 参考如何创建FlinkServer作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置并启动作业。

      需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

      安全集群且HBase的认证模式为hbase.rpc.protection=authentication时参考如下样例,建立Flink SQL作业。
      CREATE TABLE ksource1 (
      user_id STRING,
      item_id STRING,
      proctime as PROCTIME()
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ksource1',
      'properties.group.id' = 'group1',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      
      CREATE TABLE hsink1 (
      rowkey STRING,
      f1 ROW < item_id STRING >,
      PRIMARY KEY (rowkey) NOT ENFORCED
      ) WITH (
      'connector' = 'hbase-2.2',
      'table-name' = 'dim_province',
      'zookeeper.quorum' = 'ZooKeeper的quorumpeer实例业务IP1:ZooKeeper客户端端口号,ZooKeeper的quorumpeer实例业务IP2:ZooKeeper客户端端口号'
      );
      
      INSERT INTO
      hsink1
      SELECT
      user_id as rowkey,
      ROW(item_id) as f1
      FROM
      ksource1;
      • Kafka端口号:
        • 集群的“认证模式”为“安全模式”时为“sasl.port”的值,默认为“21007”。
        • 集群的“认证模式”为“普通模式”时为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

          登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

      • ZooKeeper的quorumpeer实例业务IP:

        ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

      • ZooKeeper客户端端口号:

        登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

      • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
      • HBase认证模式:

        登录FusionInsight Manager,选择“集群 > 服务 > HBase > 配置 > 全部配置”,搜索“hbase.rpc.protection”,查看HBase认证模式,当认证模式为“integrity”和“privacy”时添加如下参数:

        'properties.hbase.rpc.protection' = 'HBase认证模式'

        'properties.zookeeper.znode.parent' = '/hbase'

        'properties.hbase.security.authorization' = 'true'

        'properties.hbase.security.authentication' = 'kerberos'

    3. 查看作业管理界面,作业状态为“运行中”。

  6. 参考管理Kafka Topic中的消息,向kafka中写入数据。

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为ksource1:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic ksource1 --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"user_id": "3","item_id":"333333"}
    {"user_id": "4","item_id":"44444444"}

    输入完成后按回车发送消息。

  7. 登录HBase客户端,查看表数据信息。

    hbase shell

    scan 'dim_province'

场景二:HBase作为维表。

  1. 在HBase客户端建表并写入数据。

    参考HBase客户端使用实践,登录HBase客户端,使用create 'hbase_dim_table',"f1"创建hbase_dim_table表并写入数据:
    put 'hbase_dim_table','1','f1:address','city1'
    put 'hbase_dim_table','2','f1:address','city2'
    put 'hbase_dim_table','3','f1:address','city3'

  1. 拷贝HBase配置文件至FlinkServer所在节点。

    1. 以客户端安装用户登录安装客户端的节点,拷贝HBase的“/opt/client/HBase/hbase/conf/”目录下的所有配置文件至部署FlinkServer的所有节点的一个空目录,如“/tmp1/client/HBase/hbase/conf/”。
    2. 修改FlinkServer节点上面配置文件目录及其上层目录属主为omm。
      chown omm: /tmp1/client/HBase/ -R
      • FlinkServer节点:

        登录Manager,选择“集群 > 服务 > Flink > 实例”,查看FlinkServer所在的“业务IP”。

      • 若FlinkServer实例所在节点与包含HBase服务客户端的安装节点相同,则该节点不执行此步骤。

  2. 添加FlinkServer本地访问HBase集群路径。

    登录Manager,选择“集群 > 服务 > Flink > 配置 > 全部配置”,搜索“HBASE_CONF_DIR”参数,在该参数的“值”中填写上一步中拷贝了HBase配置文件的FlinkServer的目录,如“/tmp1/client/HBase/hbase/conf/”。填写完成后单击“保存”,确认修改配置后单击“确定”。

    若FlinkServer实例所在节点与包含HBase服务客户端的安装节点相同,则在“HBASE_CONF_DIR”参数的“值”填写HBase的“/opt/client/HBase/hbase/conf/”目录。

  1. 重启受影响的FlinkServer实例。

    单击“实例”,勾选所有FlinkServer实例,选择“更多 > 重启实例”,输入密码,单击“确定”重启实例。

  2. 访问FlinkServer,并创建Flink SQL作业。

    1. 登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
    2. 参考如何创建FlinkServer作业,新建Flink SQL作业,作业类型选择“流作业”。在作业开发界面进行如下作业配置并启动作业。

      需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

      安全集群且HBase的认证模式为hbase.rpc.protection=authentication时参考如下样例,建立Flink SQL作业。
      CREATE TABLE KafkaSource (
        `user_id` STRING,
        `user_name` STRING,
        proctime as proctime()
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_source',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv',
        'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数
        'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
        'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      CREATE TABLE KafkaSink (
        -- Kafka作为sink表
        `user_id` VARCHAR,
        `user_name` VARCHAR,
        `address` VARCHAR
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_sink',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv',
        'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数
        'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
        'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
      );
      CREATE TABLE hbaseTable (
        -- hbase作为维表
        user_id STRING,
        f1 ROW < address STRING >,
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'hbase-2.2',
        'table-name' = 'hbase_dim_table',
        'zookeeper.quorum' = 'ZooKeeper的quorumpeer实例业务IP1:ZooKeeper客户端端口号,ZooKeeper的quorumpeer实例业务IP2:ZooKeeper客户端端口号'
      );
      INSERT INTO
        KafkaSink
      SELECT
        t.user_id,
        t.user_name,
        d.address
      FROM
        KafkaSource as t
        JOIN hbaseTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_id = d.user_id;
    3. 查看作业管理界面,作业状态为“运行中”。

  3. 参考管理Kafka Topic中的消息,向kafka中写入数据。

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    输入消息内容,输入完成后按回车发送消息:
    1,name1
    2,name2
    3,name3

  4. 执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config 户端目录/Kafka/kafka/config/consumer.properties

    结果如下:

    1,name1,city1
    2,name2,city2
    3,name3,city3

应用端提交作业

  • 如果使用Flink run模式,推荐使用export HBASE_CONF_DIR=hbase的配置目录,例如:export HBASE_CONF_DIR=/opt/hbaseconf。
  • 如果使用Flink run-application模式,则有如下两种方式。
    • 在建表语句中添加如下配置(推荐)
      表1 相关配置

      配置

      说明

      'properties.hbase.rpc.protection' = 'authentication'

      需和HBase服务端的配置一致

      'properties.zookeeper.znode.parent' = '/hbase'

      多服务场景中,会存在hbase1,hbase2,需明确要访问的集群

      'properties.hbase.security.authorization' = 'true'

      开启鉴权

      'properties.hbase.security.authentication' = 'kerberos'

      开启kerberos认证

      示例:
      CREATE TABLE hsink1 (
           rowkey STRING,
           f1 ROW < q1 STRING >,
           PRIMARY KEY (rowkey) NOT ENFORCED
          ) WITH (
            'connector' = 'hbase-2.2',
            'table-name' = 'cc',
            'zookeeper.quorum' = 'x.x.x.x:clientPort',
            'properties.hbase.rpc.protection' = 'authentication',
            'properties.zookeeper.znode.parent' = '/hbase',
            'properties.hbase.security.authorization' = 'true',
            'properties.hbase.security.authentication' = 'kerberos'
         );
    • 提交作业时将HBase的配置添加到yarnShip中。

      例如:Dyarn.ship-files=/opt/hbaseconf。