Help Center/ GaussDB(DWS)/ Getting Started/ Using DLI Flink Jobs to Write Kafka Data to GaussDB(DWS) in Real Time
Updated on 2023-11-14 GMT+08:00

Using DLI Flink Jobs to Write Kafka Data to GaussDB(DWS) in Real Time

This practice demonstrates how to use DLI Flink jobs to synchronize consumption data from Kafka to GaussDB(DWS) in real time. The demonstration process includes writing and updating existing data in real time.

Figure 1 Importing Kafka data to GaussDB(DWS) in real time

This practice takes about 90 minutes. The cloud services used in this practice include Virtual Private Cloud (VPC) and subnets, Elastic Cloud Server (ECS), Object Storage Service (OBS), Distributed Message Service (DMS) for Kafka, Data Lake Insight (DLI), and Data Warehouse Service (DWS). The basic process is as follows:

  1. Preparations
  2. Step 1: Creating a Kafka Instance
  3. Step 2: Creating a Data Warehouse Cluster and Target Table
  4. Step 3: Creating a DLI Queue
  5. Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)
  6. Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink
  7. Step 6: Creating and Editing a DLI Flink Job
  8. Step 7: Creating and Modifying Messages on the Kafka Client

Scenario Description

Assume that the sample data of the data source Kafka is a user information table, as shown in Table 1, which contains the id, name, and age fields. The id field is unique and fixed, which is shared by multiple service systems. Generally, the id field does not need to be modified. Only the name and age fields need to be modified.

Use Kafka to generate the following three groups of data and use DLI Flink jobs to synchronize the data to GaussDB(DWS): Change the users whose IDs are 2 and 3 to jim and tom, and use DLI Flink jobs to update data and synchronize the data to GaussDB(DWS).

Table 1 Sample data

id

name

age

1

lily

16

2

lucy > jim

17

3

lilei > tom

15

Constraints

  • Ensure that VPC, ECS, OBS, Kafka, DLI, and GaussDB(DWS) are in the same region, for example, Europe-Dublin.
  • Ensure that Kafka, DLI, and GaussDB(DWS) can communicate with each other. In this practice, Kafka and GaussDB(DWS) are created in the same region and VPC, and the security groups of Kafka and GaussDB(DWS) allow the network segment of the DLI queues.

Preparations

You have registered a Huawei account and enabled Huawei Cloud services.. Before using GaussDB(DWS), check the account status. The account cannot be in arrears or frozen.

You have created a VPC and subnet. For details, see Creating a VPC.

Step 1: Creating a Kafka Instance

  1. Log in to the Huawei Cloud management console and choose Middleware > Distributed Message Service (for Kafka) from the service list. The Kafka management console is displayed.
  2. Click DMS for Kafka on the left and click Buy Instance in the upper right corner.
  3. Set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 2 Kafka instance parameters

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    Europe-Dublin

    Project

    Default

    AZ

    AZ 1 (If not available, select another AZ.)

    Instance Name

    kafka-dli-dws

    Enterprise Project

    default

    Specifications

    Default

    Version

    2.7

    CPU Architecture

    x86

    Broker Flavor

    kafka.2u4g.cluster.small (For reference only. Select the smallest flavor.)

    Brokers

    3

    VPC

    Select a created VPC. If no VPC is available, create one.

    Security Group

    Select a created security group. If no security group is available, create one.

    Other parameters

    Retain the default value.

    Figure 2 Creating a Kafka instance

  4. Click Buy and complete the payment.

    Wait until the creation is successful.

  5. In the Kafka instance list, click the name of the created Kafka instance. The Basic Information page is displayed.
  6. Choose Topics on the left and click Create Topic.

    Set Topic Name to topic-demo and retain the default values for other parameters.

    Figure 3 Creating a topic

  7. Click OK.

    The topic-demo is successfully created in the topic list.

  8. Choose Consumer Groups on the left and click Create Consumer Group.
  9. Enter kafka01 for Consumer Group Name and click OK.

Step 2: Creating a Data Warehouse Cluster and Target Table

  1. Creating a Cluster. To ensure network connectivity, the region and VPC of the data warehouse cluster must be the same as those of the Kafka instance. In this practice, the region and VPC are Europe-Dublin. The VPC must be the same as that created for Kafka.
  2. On the Clusters page of the GaussDB(DWS) console, click Login on the right of the cluster.

    This practice uses version 8.1.3.x as an example. 8.1.2 and earlier versions do not support this login mode. You can use Data Studio to connect to a cluster. For details, see Using Data Studio to Connect to a Cluster.

  3. The login username is dbadmin, the database name is gaussdb, and the password is the password of user dbadmin set during data warehouse cluster creation. Select Remember Password, enable Collect Metadata Periodically and Show Executed SQL Statements, and click Log In.

    Figure 4 Logging In to GaussDB(DWS)

  4. Click the database name gaussdb and click SQL Window in the upper right corner to access the SQL editor.
  5. Copy the following SQL statement. In the SQL window, click Execute SQL to create the target table user_dws.

    1
    2
    3
    4
    5
    6
    CREATE TABLE user_dws (
    id int,
    name varchar(50),
    age int,
    PRIMARY KEY (id)
    );
    

Step 3: Creating a DLI Queue

  1. Log in to the Huawei Cloud management console and choose Analytics > Data Lake Insight from the service list. The DLI management console is displayed.
  2. Choose Resources > Queue Management on the left.
  3. Click Buy Queue in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.

    Table 3 DLI queue parameters

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    Europe-Dublin

    Project

    Default

    Name

    dli_dws

    Type

    For a general queue, select Dedicated Resource Mode.

    AZ Mode

    Single-AZ deployment

    Specifications

    16 CUs

    Enterprise Project

    default

    Advanced Settings

    Custom

    CIDR Block

    172.16.0.0/18. It must be in a different network segment from Kafka and GaussDB(DWS). For example, if Kafka and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.

    Figure 5 Creating a DLI queue

  4. Click Buy.

Step 4: Creating an Enhanced Datasource Connection for Kafka and GaussDB(DWS)

  1. In the security group of Kafka, allow the network segment where the DLI queue is located.

    1. Return to the Kafka console and click the Kafka instance name to go to the Basic Information page. View the value of Instance Address (Private Network) in connection information and record the address for future use.
      Figure 6 Kafka private network address
    2. Click the security group name.
      Figure 7 Kafka security group
    3. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.
      Figure 8 Adding rules to the Kafka security group
    4. Click OK.

  2. Return to the DLI management console, click Datasource Connections on the left, select Enhanced, and click Create.
  3. Set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 4 Connection from DLI to Kafka

    Parameter

    Value

    Connection Name

    dli_kafka

    Resource Pool

    Select the created DLI queue dli_dws.

    VPC

    Select the VPC of Kafka.

    Subnet

    Select the subnet where Kafka is located.

    Other parameters

    Retain the default value.

    Figure 9 Creating an enhanced connection

  4. Click OK.

    The Kafka connection is successfully created.

  5. Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
  6. In the address box, enter the private IP address and port number of the Kafka instance obtained in 1.a. (There are three Kafka addresses. Enter one.)

    Figure 10 Testing Kafka connectivity

  7. Click Test to verify that DLI is successfully connected to Kafka.
  8. Log in to the GaussDB(DWS) management console, choose Clusters on the left, and click the cluster name to go to the details page.
  9. As shown in the following figure, record the private domain name and private IP address of the data warehouse cluster for future use.

    Figure 11 Private domain name and IP address

  10. Click the security group name.

    Figure 12 GaussDB(DWS) security group

  11. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered during Step 3: Creating a DLI Queue.

    Figure 13 Adding a rule to the GaussDB(DWS) security group

  12. Click OK.
  13. Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
  14. In the address box, enter the private IP address and port number of the Kafka instance obtained in 9.

    Figure 14 Testing GaussDB(DWS) connectivity

  15. Click Test to verify that DLI is successfully connected to GaussDB(DWS).

Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink

dws-connector-flink is a tool for interconnecting with Flink based on DWS JDBC APIs. During DLI job configuration, this tool and its dependencies are stored in the Flink class loading directory to improve the capability of importing Flink jobs to GaussDB(DWS).

  1. Go to https://mvnrepository.com/artifact/com.huaweicloud.dws using a browser.
  2. In the software list, select the latest version of GaussDB(DWS) Connectors Flink. In this practice, select DWS Connector Flink 2 12 1 12.

  3. Click the 1.0.4 branch.( Click the newest branch in actual scenarios).

  4. Click View ALL.

  5. Click dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar to download it to the local host.

  6. Create an OBS bucket. In this practice, set the bucket name to obs-flink-dws and upload the file to the OBS bucket. Ensure that the bucket is in the same region as DLI, which in this practice is Europe-Dublin.

    Figure 15 Uploading the JAR package to the OBS bucket

Step 6: Creating and Editing a DLI Flink Job

  1. Return to the DLI management console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
  2. Set Type to Flink OpenSource SQL and Name to kafka-dws.

    Figure 16 Creating a job

  3. Click OK.

    The page for editing the job is displayed.

  4. Set the following parameters on the right of the page. Retain the default values for other parameters that are not described in the table.

    Table 5 Flink job parameters

    Parameter

    Value

    Queue

    dli_dws

    Flink Version

    1.12

    UDF Jar

    Select the JAR file in the OBS bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.

    OBS Bucket

    Select the bucket created in Step 5: Preparing the dws-connector-flink Tool for Interconnecting GaussDB(DWS) with Flink.

    Enable Checkpointing

    Check the box.

    Other parameters

    Retain the default value.

    Figure 17 Editing a job

  5. Copy the following SQL code to the SQL code window on the left.

    Obtain the private IP address and port number of the Kafka instance from 1.a, and obtain the private domain name from 9.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    CREATE TABLE user_kafka (
      id string,
      name string,
      age int
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'topic-demo',
    'properties.bootstrap.servers' ='Private IP address and port number of the Kafka instance',
      'properties.group.id' = 'kafka01',
      'scan.startup.mode' = 'latest-offset',
      "format" = "json"
    );
    
    CREATE TABLE user_dws (
      id string,
      name string,
      age int,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'dws',
    'url'='jdbc:postgresql://GaussDB(DWS) private network domain name:8000/gaussdb',
      'tableName' = 'public.user_dws',
      'username' = 'dbadmin',
    'password' ='Password of database user dbdamin'
    );
    
    insert into user_dws select * from user_kafka;
    

  6. Click Check Semantics and wait until the verification is successful.

    If the verification fails, check whether the SQL input has syntax errors.

    Figure 18 SQL statement of a job

  7. Click Save.
  8. Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
  9. Click Start on the right of the job name kafka-dws and click Start Now.

    Wait for about 1 minute and refresh the page. If the status is Running, the job is successfully executed.

    Figure 19 Job execution status

Step 7: Creating and Modifying Messages on the Kafka Client

  1. Create an ECS by referring to the ECS document. Ensure that the region and VPC of the ECS are the same as those of Kafka.
  2. Install JDK.

    1. Log in to the ECS, go to the /usr/local directory, and download the JDK package.
      1
      2
      cd /usr/local
      wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
      
    2. Decompress the downloaded JDK package.
      1
      tar -zxvf jdk-17_linux-x64_bin.tar.gz
      
    3. Run the following command to open the /etc/profile file:
      1
      vim /etc/profile
      
    4. Press i to enter editing mode and add the following content to the end of the /etc/profile file:
      1
      2
      3
      4
      5
      export JAVA_HOME=/usr/local/jdk-17.0.7 #JDK installation directory
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH 
      export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
      export PATH=$PATH:${JAVA_PATH}
      

    5. Press Esc and enter :wq! to save the settings and exit.
    6. Run the following command for the environment variables to take effect:
      1
      source /etc/profile
      
    7. Run the following command. If the following information is displayed, the JDK is successfully installed:
      1
      java -version
      

  3. Install the Kafka client.

    1. Go to the /opt directory and run the following command to obtain the Kafka client software package.
      1
      2
      cd /opt
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
      
    2. Decompress the downloaded software package.
      1
      tar -zxf kafka_2.12-2.7.2.tgz
      
    3. Go to the Kafka client directory.
      1
      cd /opt/kafka_2.12-2.7.2/bin
      

  4. Run the following command to connect to Kafka: {Connection address} indicates the internal network connection address of Kafka. For details about how to obtain the address, see 1.a. topic indicates the name of the Kafka topic created in 6.

    1
    ./kafka-console-producer.sh --broker-list {connection address} --topic {Topic name}
    

    The following is an example:

    ./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo

    If > is displayed and no other error message is displayed, the connection is successful.

  5. In the window of the connected Kafka client, copy the following content (one line at a time) based on the data planned in the Scenario Description and press Enter to produce messages:

    1
    2
    3
    {"id":"1","name":"lily","age":"16"}
    {"id":"2","name":"lucy","age":"17"}
    {"id":"3","name":"lilei","age":"15"}
    

  6. Return to the GaussDB(DWS) console, choose Clusters on the left, and click Log In on the right of the GaussDB(DWS) cluster. The SQL page is displayed.
  7. Run the following SQL statement. You can find that the data is successfully saved to the database in real time.

    1
    SELECT * FROM user_dws ORDER BY id;
    

  1. Go back to the client window for connecting to Kafka on the ECS, copy the following content (one line at a time), and press Enter to produce messages.

    1
    2
    {"id":"2","name":"jim","age":"17"}
    {"id":"3","name":"tom","age":"15"}
    

  2. Go back to the opened SQL window of GaussDB(DWS) and run the following SQL statement. It is found that the names whose IDs are 2 and 3 have been changed to jim and tom.

    The scenario description is as expected. End of this practice.
    1
    SELECT * FROM user_dws ORDER BY id;