Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Creating a FlinkServer Job/ Creating a FlinkServer Job to Interconnect with an HBase Table
Updated on 2025-08-22 GMT+08:00

Creating a FlinkServer Job to Interconnect with an HBase Table

This section applies only to MRS 3.1.2 or later.

Scenarios

FlinkServer can be interconnected with HBase. The details are as follows:

  • FlinkServer can interconnect with sink tables, source tables, and dimension tables.
  • When HBase and Flink are in the same cluster or clusters with mutual trust, FlinkServer can be interconnected with HBase, and the interconnection method is the same.
  • If HBase and Flink are in different clusters without mutual trust, Flink in a normal cluster can be interconnected with HBase in a normal cluster.

Prerequisites

  • The HDFS, Yarn, Flink, and HBase services have been installed in a cluster.
  • The client that contains the HBase service has been installed, for example, in the /opt/client directory.

Creating a Job

HBase as a Sink Table

  1. Create a table on the HBase client.

    Log in to the HBase client by referring to Using the HBase Client and run the create 'dim_province',"f1" command to create the dim_province table.

  2. Copy the HBase configuration file to the node where FlinkServer is deployed.

    1. Log in to the node where the client is installed as the client installation user and copy all configuration files in the /opt/client/HBase/hbase/conf/ directory of HBase to an empty directory on all nodes where FlinkServer is deployed, for example, /tmp/client/HBase/hbase/conf/.
    2. Change the owner of the configuration file directory and its upper-layer directory on the FlinkServer node to omm.
      chown omm: /tmp/client/HBase/ -R
      • FlinkServer nodes:

        Log in to FusionInsight Manager, choose Cluster > Services > Flink, click Instances, and check the Service IP Address value of FlinkServer.

      • If the node hosting the FlinkServer instance also has the HBase client installed, you can skip this step on that node.
      • Currently, FlinkServer can connect to only one HBase cluster (HBase in a cluster or in clusters with mutual trust). FlinkServer cannot connect to multiple HBase clusters at the same time.

  3. Add the local path for FlinkServer to access the HBase cluster.

    Log in to FusionInsight Manager and choose Cluster > Services > Flink. Click Configurations and then All Configurations, search for HBASE_CONF_DIR, and set Value to the FlinkServer directory (for example, /tmp/client/HBase/hbase/conf/) to which the HBase configuration files are copied. After the parameter is configured, click Save. After confirming the modification, click OK.

    If the node where a FlinkServer instance is deployed is the node where the HBase client is installed, set Value of the HBASE_CONF_DIR parameter to the /opt/client/HBase/hbase/conf/ directory of HBase.

  4. Restart the affected FlinkServer instances.

    Click Instances, select all FlinkServer instances, choose More > Restart Instance, enter the password, and click OK to restart the instances.

  5. Access FlinkServer and create a Flink SQL job.

    1. Log in to FusionInsight Manager and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
    2. Create a Flink SQL job and set Task Type to Stream job. For details, see Creating a Job. On the job development page, configure the job parameters as follows and start the job.

      In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.

      If the cluster is in security mode and the HBase authentication setting is hbase.rpc.protection=authentication, create a Flink SQL job by referring to the following example:
      CREATE TABLE ksource1 (
      user_id STRING,
      item_id STRING,
      proctime as PROCTIME()
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ksource1',
      'properties.group.id' = 'group1',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = 'Service IP address 1 of the Kafka Broker instance:Kafka port,Service IP address 2 of the Kafka Broker instance:Kafka port',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode.
      'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      );
      
      CREATE TABLE hsink1 (
      rowkey STRING,
      f1 ROW < item_id STRING >,
      PRIMARY KEY (rowkey) NOT ENFORCED
      ) WITH (
      'connector' = 'hbase-2.2',
      'table-name' = 'dim_province',
      'zookeeper.quorum' = 'Service IP address 1 of the ZooKeeper quorumpeer instance:ZooKeeper client port number,'Service IP address 2 of the ZooKeeper quorumpeer instance:ZooKeeper client port number'
      );
      
      INSERT INTO
      hsink1
      SELECT
      user_id as rowkey,
      ROW(item_id) as f1
      FROM
      ksource1;
      • The Kafka Broker instance IP address and Kafka port number are as follows:
        • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
        • Value of sasl.port when Authentication Mode of the cluster is Security Mode, 21007 by default.
        • Value of port when Authentication Mode of the cluster is Normal Mode, 9092 by default. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

          Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the page that is displayed, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

      • IP address of the ZooKeeper quorumpeer instance

        To obtain the IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instances and view the IP addresses of all the hosts where the quorumpeer instances locate.

      • Port number of the ZooKeeper client

        Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort.

      • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
      • HBase authentication

        Log in to FusionInsight Manager, choose Cluster > Services > HBase, click Configuration and then All Configurations, search for hbase.rpc.protection, and check the HBase authentication mode. If the authentication mode is integrity or privacy, add the following parameters:

        'properties.hbase.rpc.protection' = 'HBase authentication mode'

        'properties.zookeeper.znode.parent' = '/hbase'

        'properties.hbase.security.authorization' = 'true'

        'properties.hbase.security.authentication' = 'kerberos'

    3. On the job management page, check whether the job status is Running.

  6. Execute the following script to write data to Kafka. For details, see Managing Messages in Kafka Topics.

    sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance is deployed:Kafka port --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties

    In this example, the topic name is ksource1.

    sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance is deployed:Kafka port --topic ksource1 --producer.config /opt/client/Kafka/kafka/config/producer.properties
    Enter the message content.
    {"user_id": "3","item_id":"333333"}
    {"user_id": "4","item_id":"44444444"}

    Press Enter to send the message.

  7. Log in to the HBase client and view the table data.

    hbase shell
    scan 'dim_province'

HBase as a dimension table

  1. Create a table on the HBase client and write data into the table.

    Log in to the HBase client, use create 'hbase_dim_table',"f1" to create the hbase_dim_table table, and write data into the table. For details, see Using the HBase Client.
    put 'hbase_dim_table','1','f1:address','city1'
    put 'hbase_dim_table','2','f1:address','city2'
    put 'hbase_dim_table','3','f1:address','city3'

  1. Copy the HBase configuration file to the node where FlinkServer is deployed.

    1. Log in to the node where the client is installed as the client installation user and copy all configuration files in the /opt/client/HBase/hbase/conf/ directory of HBase to an empty directory on all nodes where FlinkServer is deployed, for example, /tmp1/client/HBase/hbase/conf/.
    2. Change the owner of the configuration file directory and its upper-layer directory on the FlinkServer node to omm.
      chown omm: /tmp1/client/HBase/ -R
      • FlinkServer nodes:

        Log in to FusionInsight Manager, choose Cluster > Services > Flink, click Instances, and check the Service IP Address value of FlinkServer.

      • If the node hosting the FlinkServer instance also has the HBase client installed, you can skip this step on that node.

  2. Add the local path for FlinkServer to access the HBase cluster.

    Log in to FusionInsight Manager and choose Cluster > Services > Flink. Click Configurations and then All Configurations, search for HBASE_CONF_DIR, and set Value to the FlinkServer directory (for example, /tmp1/client/HBase/hbase/conf/) to which the HBase configuration files are copied. After the parameter is configured, click Save. After confirming the modification, click OK.

    If the node where a FlinkServer instance resides is the node where the HBase client is installed, enter the /opt/client/HBase/hbase/conf/ directory of HBase in Value of the HBASE_CONF_DIR parameter.

  1. Restart the affected FlinkServer instances.

    Click Instances, select all FlinkServer instances, choose More > Restart Instance, enter the password, and click OK to restart the instances.

  2. Access FlinkServer and create a Flink SQL job.

    1. Log in to FusionInsight Manager and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
    2. Create a Flink SQL job and set Task Type to Stream job. For details, see Creating a Job. On the job development page, configure the job parameters as follows and start the job.

      In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.

      If the cluster is in security mode and the HBase authentication setting is hbase.rpc.protection=authentication, create a Flink SQL job by referring to the following example:
      CREATE TABLE KafkaSource (
        `user_id` STRING,
        `user_name` STRING,
        proctime as proctime()
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_source',
        'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv',
        'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode.
        'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      );
      CREATE TABLE KafkaSink (
        -- Kafka functions as a sink table.
        `user_id` VARCHAR,
        `user_name` VARCHAR,
        `address` VARCHAR
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_sink',
        'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv',
        'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode.
        'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode.
      'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode.
      );
      CREATE TABLE hbaseTable (
        -- HBase functions as a dimension table.
        user_id STRING,
        f1 ROW < address STRING >,
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'hbase-2.2',
        'table-name' = 'hbase_dim_table',
        'zookeeper.quorum' = 'Service IP address 1 of the ZooKeeper quorumpeer instance:ZooKeeper client port number,Service IP address 2 of the ZooKeeper quorumpeer instance:ZooKeeper client port number'
      );
      INSERT INTO
        KafkaSink
      SELECT
        t.user_id,
        t.user_name,
        d.address
      FROM
        KafkaSource as t
        JOIN hbaseTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_id = d.user_id;
      • The Kafka Broker instance IP address and Kafka port number are as follows:
        • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
        • Value of sasl.port when Authentication Mode of the cluster is Security Mode. The default value is 21007.
        • Value of port when Authentication Mode of the cluster is Normal Mode. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

          Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the page that is displayed, click the Configurations tab then the All Configurations sub-tab. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

      • Service IP address of the ZooKeeper quorumpeer instance:

        To obtain service IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instances and view the service IP addresses of all the hosts where the quorumpeer instances are deployed.

      • Port number of the ZooKeeper client:

        Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort.

      • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
      • HBase authentication:

        Log in to FusionInsight Manager, choose Cluster > Services > HBase, click Configuration and then All Configurations, search for hbase.rpc.protection, and check the HBase authentication mode. If the authentication mode is integrity or privacy, add the following parameters:

        'properties.hbase.rpc.protection' = 'HBase authentication mode'

        'properties.zookeeper.znode.parent' = '/hbase'

        'properties.hbase.security.authorization' = 'true'

        'properties.hbase.security.authentication' = 'kerberos'

    3. On the job management page, check whether the job status is Running.

  3. Execute the following script to write data to Kafka. For details, see Managing Messages in Kafka Topics.

    sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka Broker instance is deployed:Kafka port --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties
    Enter the message content and press Enter to send the message.
    1,name1
    2,name2
    3,name3

  4. Run the following commands to check whether data is written from the Kafka topic to the sink table:

    sh kafka-console-consumer.sh --topic Topic name --bootstrap-server Service IP address of the Kafka Broker instance:Kafka port --consumer.config Client directory/Kafka/kafka/config/consumer.properties

    The result is as follows:

    1,name1,city1
    2,name2,city2
    3,name3,city3

Submitting a Job Using the Application

  • If the Flink run mode is used, you are advised to use the export HBASE_CONF_DIR= HBase configuration directory, for example, export HBASE_CONF_DIR=/opt/hbaseconf.
  • If the Flink run-application mode is used, you can use either of the following methods to submit jobs:
    • (Recommended) Add the following configurations to a table creation statement.
      Table 1 Related configurations

      Parameter

      Description

      'properties.hbase.rpc.protection' = 'authentication'

      This parameter must be consistent with that on the HBase server.

      'properties.zookeeper.znode.parent' = '/hbase'

      If there are multiple services, hbase1 and hbase2 coexist. You must clarify the cluster to be accessed.

      'properties.hbase.security.authorization' = 'true'

      Authentication is enabled.

      'properties.hbase.security.authentication' = 'kerberos'

      Kerberos authentication is enabled.

      Example:
      CREATE TABLE hsink1 (
           rowkey STRING,
           f1 ROW < q1 STRING >,
           PRIMARY KEY (rowkey) NOT ENFORCED
          ) WITH (
            'connector' = 'hbase-2.2',
            'table-name' = 'cc',
            'zookeeper.quorum' = 'x.x.x.x:clientPort',
            'properties.hbase.rpc.protection' = 'authentication',
            'properties.zookeeper.znode.parent' = '/hbase',
            'properties.hbase.security.authorization' = 'true',
            'properties.hbase.security.authentication' = 'kerberos'
         );
    • Add the HBase configuration to YarnShip.

      Example: Dyarn.ship-files=/opt/hbaseconf