Using a Flink Job of DLI to Synchronize Kafka Data to a GaussDB(DWS) Cluster in Real Time
This practice demonstrates how to use DLI Flink jobs to synchronize consumption data from Kafka to GaussDB(DWS) in real time. The demonstration process includes writing and updating existing data in real time.
- For details, see What Is Data Lake Insight?
- For details about Kafka, see What Is DMS for Kafka?
This practice takes about 90 minutes. The cloud services used in this practice include Virtual Private Cloud (VPC) and subnets, Elastic Load Balance (ELB), Elastic Cloud Server (ECS), Object Storage Service (OBS), Distributed Message Service (DMS) for Kafka, Data Lake Insight (DLI), and Data Warehouse Service (DWS). The basic process is as follows:
- Preparations
- Step 1: Creating a Kafka Instance
- Step 2: Creating a GaussDB(DWS) Cluster and Target Table
- Step 3: Creating a DLI Queue
- Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)
- Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink
- Step 6: Creating and Editing a DLI Flink Job
- Step 7: Creating and Modifying Messages on the Kafka Client
Scenario Description
Assume that the sample data of the data source Kafka is a user information table, as shown in Table 1, which contains the id, name, and age fields. The id field is unique and fixed, which is shared by multiple service systems. Generally, the id field does not need to be modified. Only the name and age fields need to be modified.
Use Kafka to generate the following three groups of data and use DLI Flink jobs to synchronize the data to GaussDB(DWS): Change the users whose IDs are 2 and 3 to jim and tom, and use DLI Flink jobs to update data and synchronize the data to GaussDB(DWS).
Constraints
- Ensure that VPC, ECS, OBS, Kafka, DLI, and GaussDB(DWS) are in the same region, for example, China-Hong Kong.
- Ensure that Kafka, DLI, and GaussDB(DWS) can communicate with each other. In this practice, Kafka and GaussDB(DWS) are created in the same region and VPC, and the security groups of Kafka and GaussDB(DWS) allow the network segment of the DLI queues.
- To ensure that the link between DLI and GaussDB(DWS) is stable, bind the ELB service to the created GaussDB(DWS) cluster.
Preparations
- You have registered a Huawei account and enabled Huawei Cloud services.. Before using GaussDB(DWS), check the account status. The account cannot be in arrears or frozen.
- You have created a VPC and subnet. For details, see Creating a VPC.
Step 1: Creating a Kafka Instance
- Log in to the Huawei Cloud management console and choose Middleware > Distributed Message Service (for Kafka) from the service list. The Kafka management console is displayed.
- Click DMS for Kafka on the left and click Buy Instance in the upper right corner.
- Set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 2 Kafka instance parameters Parameter
Value
Billing Mode
Pay-per-use
Region
CN-Hong Kong
Project
Default
AZ
AZ 1 (If not available, select another AZ.)
Instance Name
kafka-dli-dws
Enterprise Project
default
Specifications
Default
Version
2.7
CPU Architecture
x86
Broker Flavor
kafka.2u4g.cluster.small (For reference only. Select the smallest flavor.)
Brokers
3
VPC
Select a created VPC. If no VPC is available, create one.
Security Group
Select a created security group. If no security group is available, create one.
Other parameters
Retain the default value.
Figure 2 Creating a Kafka instance
- Click Buy and complete the payment. Wait until the creation is successful.
- In the Kafka instance list, click the name of the created Kafka instance. The Basic Information page is displayed.
- Choose Topics on the left and click Create Topic.
Set Topic Name to topic-demo and retain the default values for other parameters.
Figure 3 Creating a topic
- Click OK. In the topic list, you can see that topic-demo is successfully created.
- Choose Consumer Groups on the left and click Create Consumer Group.
- Enter kafka01 for Consumer Group Name and click OK.
Step 2: Creating a GaussDB(DWS) Cluster and Target Table
- Create a dedicated load balancer, set Network Type to IPv4 private network. Set Region and VPC to the same values as those of the Kafka instance. In this example, set Region to China-Hong Kong.
- Creating a Cluster. To ensure network connectivity, the region and VPC of the GaussDB(DWS) cluster must be the same as those of the Kafka instance. In this practice, the region and VPC are China-Hong Kong. The VPC must be the same as that created for Kafka.
- Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters, locate the row that contains the target cluster, and click Login in the Operation column.
This practice uses version 8.1.3.x as an example. 8.1.2 and earlier versions do not support this login mode. You can use Data Studio to connect to a cluster. For details, see Using Data Studio to Connect to a Cluster.
- After the login is successful, the SQL editor is displayed.
- Copy the following SQL statement. In the SQL window, click Execute SQL to create the target table user_dws.
1 2 3 4 5 6
CREATE TABLE user_dws ( id int, name varchar(50), age int, PRIMARY KEY (id) );
Step 3: Creating a DLI Queue
- Log in to the Huawei Cloud management console and choose Analytics > Data Lake Insight from the service list. The DLI management console is displayed.
- In the navigation pane on the left, choose Resources > Resource Pool.
- Click Buy Resource Pool in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.
Table 3 DLI queue parameters Parameter
Value
Billing Mode
Pay-per-use
Region
CN-Hong Kong
Name
dli_dws
Specifications
Standard
CIDR Block
172.16.0.0/18. It must be in a different network segment from Kafka and GaussDB(DWS). For example, if Kafka and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.
- Click Buy and click Submit.
After the resource pool is created, go to the next step.
- On the elastic resource pool page, locate the row that contains the created resource pool, click Add Queue in the Operation column, and set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 4 Adding a queue Parameter
Value
Name
dli_dws
Type
General purpose queue
- Click Next and click OK. The queue is created.
Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)
- In the security group of Kafka, allow the network segment where the DLI queue is located.
- Return to the Kafka console and click the Kafka instance name to go to the Basic Information page. View the value of Instance Address (Private Network) in connection information and record the address for future use.
Figure 4 Kafka private network address
- Click the security group name.
Figure 5 Kafka security group
- Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.
Figure 6 Adding rules to the Kafka security group
- Click OK.
- Return to the Kafka console and click the Kafka instance name to go to the Basic Information page. View the value of Instance Address (Private Network) in connection information and record the address for future use.
- Return to the DLI management console, click Datasource Connections on the left, select Enhanced, and click Create.
- Set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 5 Connection from DLI to Kafka Parameter
Value
Connection Name
dli_kafka
Resource Pool
Select the created DLI queue dli_dws.
VPC
Select the VPC of Kafka.
Subnet
Select the subnet where Kafka is located.
Other parameters
Retain the default value.
Figure 7 Creating an enhanced connection
- Click OK. Wait until the Kafka connection is successfully created.
- Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
- In the address box, enter the private IP address and port number of the Kafka instance obtained in 1.a. (There are three Kafka addresses. Enter only one of them.)
Figure 8 Testing Kafka connectivity
- Click Test to verify that DLI is successfully connected to Kafka.
- Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters on the left, and click the cluster name to go to the details page.
- Record the private network domain name, port number, and ELB address of the GaussDB(DWS) cluster for future use.
Figure 9 Private domain name and ELB address
- Click the security group name.
Figure 10 GaussDB(DWS) security group
- Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.
Figure 11 Adding a rule to the GaussDB(DWS) security group
- Click OK.
- Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
- In the address box, enter the ELB address and port number of the GaussDB(DWS) cluster obtained in 9.
Figure 12 Testing GaussDB(DWS) connectivity
- Click Test to verify that DLI is successfully connected to GaussDB(DWS).
Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink
dws-connector-flink is a tool for interconnecting with Flink based on GaussDB(DWS) JDBC APIs. During DLI job configuration, this tool and its dependencies are stored in the Flink class loading directory to improve the capability of importing Flink jobs to GaussDB(DWS).
- Go to https://mvnrepository.com/artifact/com.huaweicloud.dws using a browser.
- In the software list, select the latest version of GaussDB(DWS) Connectors Flink. In this practice, select DWS Connector Flink 2 12 1 12.
- Click the 1.0.4 branch. (Click the newest branch in actual scenarios).
- Click View ALL.
- Click dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar to download it to the local host.
- Create an OBS bucket. In this practice, set the bucket name to obs-flink-dws and upload the file to the OBS bucket. Ensure that the bucket is in the same region as DLI, which in this practice is China-Hong Kong.
Figure 13 Uploading the JAR package to the OBS bucket
Step 6: Creating and Editing a DLI Flink Job
- Return to the DLI management console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
- Set Type to Flink OpenSource SQL and Name to kafka-dws.
Figure 14 Creating a job
- Click OK. The page for editing the job is displayed.
- Set the following parameters on the right of the page. Retain the default values for other parameters that are not described in the table.
Table 6 Flink job parameters Parameter
Value
Queue
dli_dws
Flink Version
1.12
UDF Jar
Select the JAR file in the OBS bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.
OBS Bucket
Select the bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.
Enable Checkpointing
Check the box.
Other parameters
Retain the default value.
Figure 15 Editing a job
- Copy the following SQL code to the SQL code window on the left.
Obtain the private IP address and port number of the Kafka instance from 1.a, and obtain the private domain name from 9.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
CREATE TABLE user_kafka ( id string, name string, age int ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-demo', 'properties.bootstrap.servers' ='Private IP address and port number of the Kafka instance', 'properties.group.id' = 'kafka01', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE user_dws ( id string, name string, age int, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url'='jdbc:postgresql://GaussDB(DWS) private network domain name:8000/gaussdb', 'tableName' = 'public.user_dws', 'username' = 'dbadmin', 'password' ='Password of database user dbdamin' ); INSERT INTO user_dws select * from user_kafka;
- Click Check Semantics and wait until the verification is successful.
If the verification fails, check whether the SQL input has syntax errors.
Figure 16 SQL statement of a job
- Click Save.
- Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
- Click Start on the right of the job name kafka-dws and click Start Now.
Wait for about 1 minute and refresh the page. If the status is Running, the job is successfully executed.
Figure 17 Job execution status
Step 7: Creating and Modifying Messages on the Kafka Client
- Create an ECS by referring to the ECS document. Ensure that the region and VPC of the ECS are the same as those of Kafka.
- Install JDK.
- Log in to the ECS, go to the /usr/local directory, and download the JDK package.
1 2
cd /usr/local wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
- Decompress the downloaded JDK package.
1
tar -zxvf jdk-17_linux-x64_bin.tar.gz
- Run the following command to open the /etc/profile file:
1
vim /etc/profile
- Press i to enter editing mode and add the following content to the end of the /etc/profile file:
1 2 3 4 5
export JAVA_HOME=/usr/local/jdk-17.0.7 #JDK installation directory export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH}
- Press Esc and enter :wq! to save the settings and exit.
- Run the following command for the environment variables to take effect:
1
source /etc/profile
- Run the following command. If the following information is displayed, the JDK is successfully installed:
1
java -version
- Log in to the ECS, go to the /usr/local directory, and download the JDK package.
- Install the Kafka client.
- Go to the /opt directory and run the following command to obtain the Kafka client software package.
1 2
cd /opt wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- Decompress the downloaded software package.
1
tar -zxf kafka_2.12-2.7.2.tgz
- Go to the Kafka client directory.
1
cd /opt/kafka_2.12-2.7.2/bin
- Go to the /opt directory and run the following command to obtain the Kafka client software package.
- Run the following command to connect to Kafka: {Connection address} indicates the internal network connection address of Kafka. For details about how to obtain the address, see 1.a. topic indicates the name of the Kafka topic created in 6.
1
./kafka-console-producer.sh --broker-list {connection address} --topic {Topic name}
The following is an example:
./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo
If > is displayed and no other error message is displayed, the connection is successful.
- In the window of the connected Kafka client, copy the following content (one line at a time) based on the data planned in the Scenario Description and press Enter to produce messages:
1 2 3
{"id":"1","name":"lily","age":"16"} {"id":"2","name":"lucy","age":"17"} {"id":"3","name":"lilei","age":"15"}
- Return to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters on the left, and click Log In on the right of the GaussDB(DWS) cluster. The SQL page is displayed.
- Run the following SQL statement to verify that data is successfully imported to the database in real time:
1
SELECT * FROM user_dws ORDER BY id;
- Go back to the client window for connecting to Kafka on the ECS, copy the following content (one line at a time), and press Enter to produce messages.
1 2
{"id":"2","name":"jim","age":"17"} {"id":"3","name":"tom","age":"15"}
- Go back to the opened SQL window of GaussDB(DWS) and run the following SQL statement. It is found that the names whose IDs are 2 and 3 have been changed to jim and tom.
The scenario description is as expected. End of this practice.
1
SELECT * FROM user_dws ORDER BY id;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot