Help Center/ GaussDB(DWS)/ Best Practices/ Data Migration/ Using DLI Flink Jobs to Synchronize MySQL Data to GaussDB(DWS) in Real Time
Updated on 2024-09-02 GMT+08:00

Using DLI Flink Jobs to Synchronize MySQL Data to GaussDB(DWS) in Real Time

This practice demonstrates how to use a Flink job of DLI to synchronize MySQL data to GaussDB(DWS) in real time.

For details, see What Is Data Lake Insight?

This exercise lasts for approximately 60 minutes and involves utilizing various cloud services such as Virtual Private Cloud (VPC) and Subnet, Relational Database Service (RDS), Data Lake Insight (DLI), Object Storage Service (OBS), and GaussDB(DWS). The following is an outline of the exercise.

  1. Preparations
  2. Step 1: Preparing MySQL Data
  3. Step 2: Creating a GaussDB(DWS) Cluster
  4. Step 3: Creating a DLI Queue
  5. Step 4: Creating an Enhanced Datasource Connection
  6. Step 5: Creating a DLI Flink Job
  7. Step 6: Verifying Data Synchronization
  8. More Information

Preparations

  • You have registered a Huawei ID and enabled Huawei Cloud services.. The account cannot be in arrears or frozen.
  • You have created a VPC and subnet. For details, see Creating a VPC.

Step 1: Preparing MySQL Data

  1. Buy an RDS instance and set the parameters listed in Table 1 (retain the default values for other parameters). For details, see Relational Database Service.

    Table 1 RDS parameters

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    DB Instance Name

    rds-demo

    DB Engine

    MySQL

    DB Engine Version

    5.7 or later

    Database Port

    3306

  2. Connect to the RDS instance and create an instance named mys_data.

    1
    CREATE DATABASE mys_data;
    

  3. Switch to the new database mys_data and run the following command to create the mys_orders table:

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE mys_data.mys_order
         ( order_id      VARCHAR(12),
           order_channel VARCHAR(32),
           order_time    DATETIME,
           cust_code     VARCHAR(6),
           pay_amount    DOUBLE,
           real_pay      DOUBLE,
           PRIMARY KEY (order_id) );
    

  4. insert data to the table.

    1
    2
    INSERT INTO mys_data.mys_order VALUES ('202306270001', 'webShop', TIMESTAMP('2023-06-27 10:00:00'), 'CUST1', 1000, 1000);
    INSERT INTO mys_data.mys_order VALUES ('202306270002', 'webShop', TIMESTAMP('2023-06-27 11:00:00'), 'CUST2', 5000, 5000);
    

  5. Check whether the data is inserted.

    1
    SELECT * FROM mys_data.mys_order;
    

Step 2: Creating a GaussDB(DWS) Cluster

  1. Creating a Cluster. To ensure network connectivity, select the same region and VPC as those of the RDS instance. In this practice, select China-Hong Kong. The VPC must be the same as that created for RDS.
  2. Log in to the GaussDB(DWS) console, choose Clusters > Dedicated Clusters, locate the row that contains the target cluster, and click Login in the Operation column. The login information is as follows:

    • Cluster: the created GaussDB(DWS) cluster.
    • Database: gaussdb
    • Data source name: dws-demo-01
    • Username: dbadmin
    • Password: password set when the GaussDB(DWS) cluster is created

  3. Select Remember Password, click Test Connection, and wait until the connection is successful.
  4. Copy the following SQL statements. In the SQL window, click Execute SQL to create a schema named dws_data.

    1
    CREATE SCHEMA dws_data;
    

  1. Create the dws_order table in the new schema.

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE dws_data.dws_order
            ( order_id      VARCHAR(12),
              order_channel VARCHAR(32),
              order_time    TIMESTAMP,
              cust_code     VARCHAR(6),
              pay_amount    DOUBLE PRECISION,
              real_pay      DOUBLE PRECISION );
    

  1. Query data. The current table is empty.

    1
    SELECT * FROM dws_data.dws_order;
    

Step 3: Creating a DLI Queue

  1. Log in to the Huawei Cloud console and choose Analytics > Data Lake Insight from the service list. The DLI console is displayed.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. Click Buy Resource Pool in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.

    Table 2 DLI elastic resource pool

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    Name

    dli_dws

    Specifications

    Standard

    CIDR Block

    172.16.0.0/18, which must be in a different network segment from MySQL and GaussDB(DWS). For example, if MySQL and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.

  4. Click Buy and click Submit.

    After the resource pool is created, go to the next step.

  5. On the elastic resource pool page, locate the row that contains the created resource pool, click Add Queue in the Operation column, and set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 3 Adding a queue

    Parameter

    Value

    Name

    dli_dws

    Type

    General purpose queue

  6. Click Next and click OK. The queue is created.

Step 4: Creating an Enhanced Datasource Connection

  1. In the security group of RDS, allow the network segment where the DLI queue is located.

    1. In the navigation pane on the left, choose Resources > Queue Management and record the network segment of dli_dws.
      Figure 1 DLI queue network segment
    2. Go to the RDS console, choose Instance Management in the navigation pane, and click the name of the created RDS instance.
    3. Record the value of Private IP Address in the Connection Information area, which will be used in the subsequent connectivity test.
    4. Click Manage next to the security group in Connection Information.
      Figure 2 RDS security group

    5. In the security group list that is displayed, click the security group name to go to the security group configuration page.
    6. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in Step 3: Creating a DLI Queue.
      Figure 3 Adding a rule to the RDS security group

    7. Click OK.

  2. Return to the DLI console, click Datasource Connections on the left, select Enhanced, and click Create.
  3. Set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 4 Connection from DLI to RDS

    Parameter

    Value

    Connection Name

    dli_rds

    Resource Pool

    Select the created DLI elastic resource pool.

    VPC

    Select the VPC where RDS is located.

    Subnet

    Select the subnet where RDS is located.

    Other parameters

    Retain the default values.

    Figure 4 Creating a datasource connection

  4. Click OK. Wait until the RDS connection is created.
  5. Test the connectivity between DLI and RDS.

    1. Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
    2. Enter the private IP address of RDS recorded in 1.c and port 3306 in the address box.
    3. Click Test to verify that DLI is successfully connected to RDS.
      Figure 5 Testing the connection between RDS and DLI

  6. Test the connectivity between DLI and GaussDB(DWS).

    1. Log in to the GaussDB(DWS) console, choose Clusters > Dedicated Clusters on the left, and click the cluster name to go to the details page.
    2. As shown in the following figure, record the private IP address and port number of the GaussDB(DWS) cluster for future use.
      Figure 6 GaussDB(DWS) internal IP address
    3. Click the security group name.
      Figure 7 GaussDB(DWS) security group
    4. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in 4.

      Figure 8 Adding a rule to the GaussDB(DWS) security group
    5. Click OK.
    6. Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
    7. In the address box, enter the private IP address and port number of the GaussDB(DWS) cluster.
    8. Click Test to verify that DLI is successfully connected to GaussDB(DWS).
      Figure 9 Testing GaussDB(DWS) connectivity

Step 5: Creating a DLI Flink Job

  1. Log in to the OBS console and create an OBS bucket to store Flink jobs. For details, see the OBS User Guide.

    Set key parameters as follows and retain the default values for other parameters.

    • Region: CN-Hong Kong
    • Bucket Name: dli-obs01 (If a conflict occurs, the bucket name can be increased from 02 to 03.)
    • Bucket Policy: Private

  2. Return to the DLI console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
  3. Set Type to Flink OpenSource SQL and Name to rds-dws.

    Figure 10 Creating a job

  4. Click OK. The page for editing the job is displayed.
  5. Set the following key parameters on the right of the page. Retain the default values for other parameters that are not described.

    • Queue: Select dli_dws obtained in 4.
    • Flink Version: Select version 1.15 or later. (The actual version is subject to the GUI.)
    • OBS Bucket: Select the bucket created in 1 and click Authorize.
    • (Optional) Select Save Job Log.

  6. Copy the following SQL code to the SQL code window on the left.

    For how to obtain the internal IP address of the RDS database, see 1.c. For details about how to obtain the internal IP address of the GaussDB(DWS) cluster, see 6.b. Change the password of user root of the RDS database and the password of user dbadmin of GaussDB(DWS).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    CREATE TABLE
      mys_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'mysql-cdc',
        'hostname' = 'Private IP address of the RDS DB instance',
        'port' = '3306',
        'username' = 'root',
        'password' = 'Password of user root of the RDS DB instance',
        'database-name' = 'mys_data',
        'table-name' = 'mys_order'
      );
    
    CREATE TABLE
      dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://GaussDB(DWS) cluster private IP address:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'username' = 'dbadmin',
        'password' = 'Password of GaussDB(DWS) user dbadmin',
        'write.mode' = 'insert'
      );
    
    INSERT INTO
      dws_order
    SELECT
      *
    FROM
      mys_order;
    

  7. Click Format and click Save.

    Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.

    Figure 11 Flink job parameters

  1. Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
  2. Click Start on the right of the job name rds-dws and click Start Now.

    Wait for about 1 minute and refresh the page. If the status is Running, the job is executed.

    Figure 12 Running succeeded

Step 6: Verifying Data Synchronization

  1. Go back to the SQL window of the GaussDB(DWS) database. If the connection times out, perform the following operations to log in again:

    1. Go to the GaussDB(DWS) console.
    2. In the navigation pane on the left, choose Clusters > Dedicated Clusters, and click Log In on the right of dws-demo.

  2. Check whether two rows of data in the MySQL table have been synchronized to GaussDB(DWS).

    1
    SELECT * FROM dws_data.dws_order;
    
    Figure 13 Query result

  3. Switch to the RDS for MySQL page and run the following statements to insert three new data records:

    1
    2
    3
    INSERT INTO mys_data.mys_order VALUES ('202403090003', 'webShop', TIMESTAMP('2024-03-09 13:00:00'), 'CUST1', 2000, 2000);
    INSERT INTO mys_data.mys_order VALUES ('202403090004', 'webShop', TIMESTAMP('2024-03-09 14:00:00'), 'CUST2', 3000, 3000);
    INSERT INTO mys_data.mys_order VALUES ('202403100004', 'webShop', TIMESTAMP('2024-03-10 10:00:00'), 'CUST3', 6000, 6000);
    
    Figure 14 New MySQL data

  4. Go back to the SQL window of GaussDB(DWS) and run the following SQL statement again. The returned result shows that the MySQL data has been synchronized to GaussDB(DWS) in real time.

    1
    SELECT * FROM dws_data.dws_order;
    
    Figure 15 Real-time data synchronization

More Information

Storing authentication information for a data source directly in the job script for Flink cross-source development can result in password exposure. To enhance security, use DLI's datasource authentication function instead of specifying MySQL and GaussDB(DWS) usernames and passwords directly in job scripts.

Currently, only Flink 1.12 supports this function. Pay attention to the document changes on the official website.

  1. Log in to the DLI console, click Datasource Connections, and click Datasource Authentication.
  2. Click Create.
  3. Create the password authentication for the root user of the MySQL database.

    1. Set the following parameters:
      • Type: Password
      • Authentication Certificate: mysql_pwd_auth
      • Username: root
      • Password: password of user root
      Figure 16 MySQL password authentication
    2. Click OK.

  4. Create password authentication for the dbadmin user of GaussDB(DWS).

    1. Set the following parameters:
      • Type: Password
      • Authentication Certificate: dws_pwd_auth
      • Username: dbadmin
      • Password: password of user dbadmin
      Figure 17 GaussDB(DWS) password authentication
    2. Click OK.

  5. On the DLI console, choose Job Management > Flink Jobs. Locate the row that contains the job created in Step 5: Creating a DLI Flink Job, and choose More > Stop to stop the job.
  6. After the job is stopped, you can edit the job name.
  7. Replace the SQL script with the latest one.

    Replace the private IP addresses of RDS and GaussDB(DWS).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    CREATE TABLE mys_order (
      order_id STRING,
      order_channel STRING,
      order_time TIMESTAMP,
      cust_code STRING,
      pay_amount DOUBLE,
      real_pay DOUBLE,
      PRIMARY KEY (order_id) NOT ENFORCED ) 
    WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'Private IP address of RDS',
      'port' = '3306',
      'pwd_auth_name' = 'mysql_pwd_auth',
      'database-name' = 'mys_data',
      'table-name' = 'mys_order' );
     
    CREATE TABLE dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE, 
        PRIMARY KEY (order_id) NOT ENFORCED )
    WITH (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://GaussDB(DWS) private IP address:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'pwd_auth_name' = 'dws_pwd_auth',
        'write.mode' = 'insert' );
     
    INSERT INTO dws_order SELECT * FROM mys_order;
    

  8. Click Format and click Save.
  9. Restart the job and verify data synchronization by referring to Step 6: Verifying Data Synchronization.