Using a DLI Flink Job to Synchronize MRS Kafka Data to a CloudTable HBase Cluster in Real Time
This section describes the best practices of real-time data synchronization. You can use DLI Flink jobs to synchronize MRS Kafka data to HBase in real time.
- For details about DLI, see Data Lake Insight Service Overview.
- For details about Kafka, see the MRS Service Overview.
Figure 1 Data synchronization process
Constraints
- Kerberos authentication is not enabled for the MRS cluster.
- To ensure network connectivity, the security group, region, VPC, and subnet of the MRS cluster must be the same as those of the CloudTable cluster.
- To establish a data source connection, add the CIDR block of the Data Lake Instance (DLI) queue to the inbound rules of the CloudTable security group. For details, see Creating an Enhanced Datasource Connection.
- The upstream and downstream network connectivity is established for DLI. For details, see Testing Address Connectivity.
Procedure
The general procedure is as follows:
Preparations
- Sign up for a HUAWEI ID and enable Huawei Cloud services. For details, see Signing Up for a HUAWEI ID and Enabling Huawei Cloud Services. The account cannot be in arrears or frozen.
- Create a VPC and subnet. For details, see Creating a VPC and Subnet.
Step 1: Creating a CloudTable HBase Cluster
- Log in to the CloudTable console and create a CloudTable HBase cluster.
- Create an ECS. For details, see Preparing the ECS.
- Deploying a Client in One Click.
- Start the HBase shell to access the cluster. Run the bin/hbase shell command to start the shell to access the cluster.
- Create the order table.
create 'order', {NAME => 'detail'}
Step 2: Creating a Flink Job in the MRS Cluster to Generate Data
- Create an MRS cluster.
- Log in to Manager and choose Cluster > Flink > Dashboard.
- Click the link on the right of Flink WebUI to access the Flink web UI.
- On the Flink web UI, create a Flink task to generate data.
- Click Create Job on the Job Management page. The Create Job page is displayed.
- Set parameters and click OK to create a Flink SQL job. To modify a SQL statement, click Develop in the Operation column and add the following command on the SQL page:
ip:port: IP address and port number
- To obtain the IP address, log in to FusionInsight Manager and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker.
- To obtain the port number, click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
- You are advised to add multiple IP addresses and port numbers to the properties.bootstrap.servers parameter to prevent job running failures caused by unstable network or other reasons.
CREATE TABLE IF NOT EXISTS `lineorder_hbase` ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'value.format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE lineorder_datagen ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000' ); INSERT INTO lineorder_hbase SELECT * FROM lineorder_datagen;
- Return to the Job Management page and click Start in the Operation column. If the job status is Running, the job is successfully executed.
Step 3: Creating a DLI Flink Job to Synchronize Data
- Create elastic resources and queues. For details, see Creating an Elastic Resource Pool and Creating Queues Within It.
- For how to create a datasource connection, see Creating an Enhanced Datasource Connection.
- Test the connectivity between DLI and the upstream MRS Kafka and between DLI and the downstream CloudTable HBase.
- After the elastic resource and queue are created, choose Resources > Queue Management. On the Queue Management page that is displayed, test the address connectivity. For details, see Testing Address Connectivity.
- To obtain the upstream IP address and port number, go to Manager of the cluster and choose Cluster > Kafka > Instance. On the displayed page, view the Management IP Address of Broker. Click Configurations. On the configuration page that is displayed, search for port and obtain the port number (which is the PLAINTEXT protocol port number listened by the Broker service).
- Obtain the downstream IP address and port number.
- To obtain the IP address, go to the Details page. Obtain the domain name from ZK Link (Intranet) under Cluster Information. Run the following command to resolve the IP address:
ping Access domain name
- To obtain the port number, go to the Details page. Obtain the port from ZK Link (Intranet) under Cluster Information.
- To obtain the IP address, go to the Details page. Obtain the domain name from ZK Link (Intranet) under Cluster Information. Run the following command to resolve the IP address:
- For details about how to create a Flink job, see Submitting a Flink Job Using DLI.
- Select the Flink job created in 1, click Edit in the Operation column, and add SQL statements for data synchronization.
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'properties.group.id' = 'testGroup_1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table hbaseSink( order_id string, detail Row( order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) ) with ( 'connector' = 'hbase-2.2', 'table-name' = 'order', 'zookeeper.quorum' = 'ip:port', 'sink.buffer-flush.max-rows' = '1' ); insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
- Click Format and click Save.
Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.
- On the DLI management console, choose Job Management > Flink Jobs.
- Click Start in the Operation column to start the job created in 1. If the job status is Running, the job is successfully executed.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot