Using DLI Flink Jobs to Write Kafka Data to GaussDB(DWS) in Real Time
This practice demonstrates how to use DLI Flink jobs to synchronize consumption data from Kafka to GaussDB(DWS) in real time. The demonstration process includes writing and updating existing data in real time.
- For details, see What Is Data Lake Insight?
- For details about Kafka, see What Is DMS for Kafka?
This practice takes about 90 minutes. The cloud services used in this practice include Virtual Private Cloud (VPC) and subnets, Elastic Load Balance (ELB), Elastic Cloud Server (ECS), Object Storage Service (OBS), Distributed Message Service (DMS) for Kafka, Data Lake Insight (DLI), and Data Warehouse Service (DWS). The basic process is as follows:
- Preparations
- Step 1: Creating a Kafka Instance
- Step 2: Creating a GaussDB(DWS) Cluster and Target Table
- Step 3: Creating a DLI Queue
- Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)
- Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink
- Step 6: Creating and Editing a DLI Flink Job
- Step 7: Creating and Modifying Messages on the Kafka Client
Scenario Description
Assume that the sample data of the data source Kafka is a user information table, as shown in Table 1, which contains the id, name, and age fields. The id field is unique and fixed, which is shared by multiple service systems. Generally, the id field does not need to be modified. Only the name and age fields need to be modified.
Use Kafka to generate the following three groups of data and use DLI Flink jobs to synchronize the data to GaussDB(DWS): Change the users whose IDs are 2 and 3 to jim and tom, and use DLI Flink jobs to update data and synchronize the data to GaussDB(DWS).
Constraints
- Ensure that VPC, ECS, OBS, Kafka, DLI, and GaussDB(DWS) are in the same region, for example, Europe-Dublin.
- Ensure that Kafka, DLI, and GaussDB(DWS) can communicate with each other. In this practice, Kafka and GaussDB(DWS) are created in the same region and VPC, and the security groups of Kafka and GaussDB(DWS) allow the network segment of the DLI queues.
- To ensure that the link between DLI and DWS is stable, bind the ELB service to the created data warehouse cluster.
Preparations
- You have registered a Huawei account and enabled Huawei Cloud services.. Before using GaussDB(DWS), check the account status. The account cannot be in arrears or frozen.
- You have created a VPC and subnet. For details, see Creating a VPC.
Step 1: Creating a Kafka Instance
- Log in to the Huawei Cloud management console and choose Middleware > Distributed Message Service (for Kafka) from the service list. The Kafka management console is displayed.
- Click DMS for Kafka on the left and click Buy Instance in the upper right corner.
- Set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 2 Kafka instance parameters Parameter
Value
Billing Mode
Pay-per-use
Region
Europe-Dublin
Project
Default
AZ
AZ 1 (If not available, select another AZ.)
Instance Name
kafka-dli-dws
Enterprise Project
default
Specifications
Default
Version
2.7
CPU Architecture
x86
Broker Flavor
kafka.2u4g.cluster.small (For reference only. Select the smallest flavor.)
Brokers
3
VPC
Select a created VPC. If no VPC is available, create one.
Security Group
Select a created security group. If no security group is available, create one.
Other parameters
Retain the default value.
Figure 2 Creating a Kafka instance
- Click Buy and complete the payment. Wait until the creation is successful.
- In the Kafka instance list, click the name of the created Kafka instance. The Basic Information page is displayed.
- Choose Topics on the left and click Create Topic.
Set Topic Name to topic-demo and retain the default values for other parameters.
Figure 3 Creating a topic
- Click OK. In the topic list, you can see that topic-demo is successfully created.
- Choose Consumer Groups on the left and click Create Consumer Group.
- Enter kafka01 for Consumer Group Name and click OK.
Step 2: Creating a GaussDB(DWS) Cluster and Target Table
- Create a dedicated load balancer, set Network Type to IPv4 private network. Set Region and VPC to the same values as those of the Kafka instance. In this example, set Region to Europe-Dublin.
- Creating a Cluster. To ensure network connectivity, the region and VPC of the GaussDB(DWS) cluster must be the same as those of the Kafka instance. In this practice, the region and VPC are Europe-Dublin. The VPC must be the same as that created for Kafka.
- On the Clusters page of the GaussDB(DWS) console, locate the row that contains the target cluster and click Login in the Operation column.
This practice uses version 8.1.3.x as an example. 8.1.2 and earlier versions do not support this login mode. You can use Data Studio to connect to a cluster. For details, see Using Data Studio to Connect to a Cluster.
- The login username is dbadmin, the database name is gaussdb, and the password is the password of user dbadmin set during data warehouse cluster creation. Select Remember Password, enable Collect Metadata Periodically and Show Executed SQL Statements, and click Log In.
Figure 4 Logging In to GaussDB(DWS)
- Click the database name gaussdb and click SQL Window in the upper right corner to access the SQL editor.
- Copy the following SQL statement. In the SQL window, click Execute SQL to create the target table user_dws.
1 2 3 4 5 6
CREATE TABLE user_dws ( id int, name varchar(50), age int, PRIMARY KEY (id) );
Step 3: Creating a DLI Queue
- Log in to the Huawei Cloud management console and choose Analytics > Data Lake Insight from the service list. The DLI management console is displayed.
- In the navigation pane on the left, choose Resource Management > Queue Manager.
- Click Buy Queue in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.
Table 3 DLI queue parameters Parameter
Value
Billing Mode
Pay-per-use
Region
Europe-Dublin
Project
Default
Name
dli_dws
Type
For a general queue, select Dedicated Resource Mode.
AZ Mode
Single-AZ deployment
Specifications
16 CUs
Enterprise Project
default
Advanced Settings
Custom
CIDR Block
172.16.0.0/18. It must be in a different network segment from Kafka and GaussDB(DWS). For example, if Kafka and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.
Figure 5 Creating a DLI queue
- Click Buy.
Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)
- In the security group of Kafka, allow the network segment where the DLI queue is located.
- Return to the Kafka console and click the Kafka instance name to go to the Basic Information page. View the value of Instance Address (Private Network) in connection information and record the address for future use.
Figure 6 Kafka private network address
- Click the security group name.
Figure 7 Kafka security group
- Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.
Figure 8 Adding rules to the Kafka security group
- Click OK.
- Return to the Kafka console and click the Kafka instance name to go to the Basic Information page. View the value of Instance Address (Private Network) in connection information and record the address for future use.
- Return to the DLI management console, click Datasource Connections on the left, select Enhanced, and click Create.
- Set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 4 Connection from DLI to Kafka Parameter
Value
Connection Name
dli_kafka
Resource Pool
Select the created DLI queue dli_dws.
VPC
Select the VPC of Kafka.
Subnet
Select the subnet where Kafka is located.
Other parameters
Retain the default value.
Figure 9 Creating an enhanced connection
- Click OK. Wait until the Kafka connection is successfully created.
- Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
- In the address box, enter the private IP address and port number of the Kafka instance obtained in 1.a. (There are three Kafka addresses. Enter only one of them.)
Figure 10 Testing Kafka connectivity
- Click Test to verify that DLI is successfully connected to Kafka.
- Log in to the GaussDB(DWS) management console, choose Clusters on the left, and click the cluster name to go to the details page.
- Record the private network domain name, port number, and Elastic Load Balance address of the data warehouse cluster for future use.
Figure 11 Private Domain Name and ELB Address
- Click the security group name.
Figure 12 GaussDB(DWS) security group
- Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.
Figure 13 Adding a rule to the GaussDB(DWS) security group
- Click OK.
- Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
- In the address box, enter the Elastic Load Balance IP address and port number of the GaussDB (DWS) cluster obtained in 9.
Figure 14 Testing GaussDB(DWS) connectivity
- Click Test to verify that DLI is successfully connected to GaussDB(DWS).
Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink
dws-connector-flink is a tool for interconnecting with Flink based on DWS JDBC APIs. During DLI job configuration, this tool and its dependencies are stored in the Flink class loading directory to improve the capability of importing Flink jobs to GaussDB(DWS).
- Go to https://mvnrepository.com/artifact/com.huaweicloud.dws using a browser.
- In the software list, select the latest version of GaussDB(DWS) Connectors Flink. In this practice, select DWS Connector Flink 2 12 1 12.
- Click the 1.0.4 branch.( Click the newest branch in actual scenarios).
- Click View ALL.
- Click dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar to download it to the local host.
- Create an OBS bucket. In this practice, set the bucket name to obs-flink-dws and upload the file to the OBS bucket. Ensure that the bucket is in the same region as DLI, which in this practice is Europe-Dublin.
Figure 15 Uploading the JAR package to the OBS bucket
Step 6: Creating and Editing a DLI Flink Job
- Return to the DLI management console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
- Set Type to Flink OpenSource SQL and Name to kafka-dws.
Figure 16 Creating a job
- Click OK. The page for editing the job is displayed.
- Set the following parameters on the right of the page. Retain the default values for other parameters that are not described in the table.
Table 5 Flink job parameters Parameter
Value
Queue
dli_dws
Flink Version
1.12
UDF Jar
Select the JAR file in the OBS bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.
OBS Bucket
Select the bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.
Enable Checkpointing
Check the box.
Other parameters
Retain the default value.
Figure 17 Editing a job
- Copy the following SQL code to the SQL code window on the left.
Obtain the private IP address and port number of the Kafka instance from 1.a, and obtain the private domain name from 9.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
CREATE TABLE user_kafka ( id string, name string, age int ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-demo', 'properties.bootstrap.servers' ='Private IP address and port number of the Kafka instance', 'properties.group.id' = 'kafka01', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE user_dws ( id string, name string, age int, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url'='jdbc:postgresql://GaussDB(DWS) private network domain name:8000/gaussdb', 'tableName' = 'public.user_dws', 'username' = 'dbadmin', 'password' ='Password of database user dbdamin' ); insert into user_dws select * from user_kafka;
- Click Check Semantics and wait until the verification is successful.
If the verification fails, check whether the SQL input has syntax errors.
Figure 18 SQL statement of a job
- Click Save.
- Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
- Click Start on the right of the job name kafka-dws and click Start Now.
Wait for about 1 minute and refresh the page. If the status is Running, the job is successfully executed.
Figure 19 Job execution status
Step 7: Creating and Modifying Messages on the Kafka Client
- Create an ECS by referring to the ECS document. Ensure that the region and VPC of the ECS are the same as those of Kafka.
- Install JDK.
- Log in to the ECS, go to the /usr/local directory, and download the JDK package.
1 2
cd /usr/local wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
- Decompress the downloaded JDK package.
1
tar -zxvf jdk-17_linux-x64_bin.tar.gz
- Run the following command to open the /etc/profile file:
1
vim /etc/profile
- Press i to enter editing mode and add the following content to the end of the /etc/profile file:
1 2 3 4 5
export JAVA_HOME=/usr/local/jdk-17.0.7 #JDK installation directory export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH}
- Press Esc and enter :wq! to save the settings and exit.
- Run the following command for the environment variables to take effect:
1
source /etc/profile
- Run the following command. If the following information is displayed, the JDK is successfully installed:
1
java -version
- Log in to the ECS, go to the /usr/local directory, and download the JDK package.
- Install the Kafka client.
- Go to the /opt directory and run the following command to obtain the Kafka client software package.
1 2
cd /opt wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- Decompress the downloaded software package.
1
tar -zxf kafka_2.12-2.7.2.tgz
- Go to the Kafka client directory.
1
cd /opt/kafka_2.12-2.7.2/bin
- Go to the /opt directory and run the following command to obtain the Kafka client software package.
- Run the following command to connect to Kafka: {Connection address} indicates the internal network connection address of Kafka. For details about how to obtain the address, see 1.a. topic indicates the name of the Kafka topic created in 6.
1
./kafka-console-producer.sh --broker-list {connection address} --topic {Topic name}
The following is an example:
./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo
If > is displayed and no other error message is displayed, the connection is successful.
- In the window of the connected Kafka client, copy the following content (one line at a time) based on the data planned in the Scenario Description and press Enter to produce messages:
1 2 3
{"id":"1","name":"lily","age":"16"} {"id":"2","name":"lucy","age":"17"} {"id":"3","name":"lilei","age":"15"}
- Return to the GaussDB(DWS) console, choose Clusters on the left, and click Log In on the right of the GaussDB(DWS) cluster. The SQL page is displayed.
- Run the following SQL statement. You can find that the data is successfully saved to the database in real time.
1
SELECT * FROM user_dws ORDER BY id;
- Go back to the client window for connecting to Kafka on the ECS, copy the following content (one line at a time), and press Enter to produce messages.
1 2
{"id":"2","name":"jim","age":"17"} {"id":"3","name":"tom","age":"15"}
- Go back to the opened SQL window of GaussDB(DWS) and run the following SQL statement. It is found that the names whose IDs are 2 and 3 have been changed to jim and tom.
The scenario description is as expected. End of this practice.
1
SELECT * FROM user_dws ORDER BY id;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.