Using a Flink Job of DLI to Synchronize MySQL Data to a GaussDB(DWS) Cluster in Real Time
This practice demonstrates how to use a Flink job of DLI to synchronize MySQL data to GaussDB(DWS) in real time.
For details, see What Is Data Lake Insight?
This exercise lasts for approximately 60 minutes and involves utilizing various cloud services such as Virtual Private Cloud (VPC) and Subnet, Relational Database Service (RDS), Data Lake Insight (DLI), Object Storage Service (OBS), and GaussDB(DWS). The following is an outline of the exercise.
- Preparations
- Step 1: Preparing MySQL Data
- Step 2: Creating a GaussDB(DWS) Cluster
- Step 3: Creating a DLI Queue
- Step 4: Creating an Enhanced Datasource Connection
- Step 5: Creating a DLI Flink Job
- Step 6: Verifying Data Synchronization
- More Information
Preparations
- You have registered a Huawei ID and enabled Huawei Cloud services.. The account cannot be in arrears or frozen.
- You have created a VPC and subnet. For details, see Creating a VPC.
Step 1: Preparing MySQL Data
- Buy an RDS instance and set the parameters listed in Table 1 (retain the default values for other parameters). For details, see Relational Database Service.
- Connect to the RDS instance and create an instance named mys_data.
1
CREATE DATABASE mys_data;
- Switch to the new database mys_data and run the following command to create the mys_orders table:
1 2 3 4 5 6 7 8
CREATE TABLE mys_data.mys_order ( order_id VARCHAR(12), order_channel VARCHAR(32), order_time DATETIME, cust_code VARCHAR(6), pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) );
- insert data to the table.
1 2
INSERT INTO mys_data.mys_order VALUES ('202306270001', 'webShop', TIMESTAMP('2023-06-27 10:00:00'), 'CUST1', 1000, 1000); INSERT INTO mys_data.mys_order VALUES ('202306270002', 'webShop', TIMESTAMP('2023-06-27 11:00:00'), 'CUST2', 5000, 5000);
- Check whether the data is inserted.
1
SELECT * FROM mys_data.mys_order;
Step 2: Creating a GaussDB(DWS) Cluster
- Creating a Cluster. To ensure network connectivity, select the same region and VPC as those of the RDS instance. In this practice, select China-Hong Kong. The VPC must be the same as that created for RDS.
- Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters, locate the row that contains the target cluster, and click Login in the Operation column. The login information is as follows:
- Cluster: the created GaussDB(DWS) cluster.
- Database: gaussdb
- Data source name: dws-demo-01
- Username: dbadmin
- Password: password set when the GaussDB(DWS) cluster is created
- Select Remember Password, click Test Connection, and wait until the connection is successful.
- Copy the following SQL statements. In the SQL window, click Execute SQL to create a schema named dws_data.
1
CREATE SCHEMA dws_data;
- Create the dws_order table in the new schema.
1 2 3 4 5 6 7
CREATE TABLE dws_data.dws_order ( order_id VARCHAR(12), order_channel VARCHAR(32), order_time TIMESTAMP, cust_code VARCHAR(6), pay_amount DOUBLE PRECISION, real_pay DOUBLE PRECISION );
- Query data. The current table is empty.
1
SELECT * FROM dws_data.dws_order;
Step 3: Creating a DLI Queue
- Log in to the Huawei Cloud console and choose Analytics > Data Lake Insight from the service list. The DLI console is displayed.
- In the navigation pane on the left, choose Resources > Resource Pool.
- Click Buy Resource Pool in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.
Table 2 DLI elastic resource pool Parameter
Value
Billing Mode
Pay-per-use
Region
CN-Hong Kong
Name
dli_dws
Specifications
Standard
CIDR Block
172.16.0.0/18, which must be in a different network segment from MySQL and GaussDB(DWS). For example, if MySQL and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.
- Click Buy and click Submit.
After the resource pool is created, go to the next step.
- On the elastic resource pool page, locate the row that contains the created resource pool, click Add Queue in the Operation column, and set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 3 Adding a queue Parameter
Value
Name
dli_dws
Type
General purpose queue
- Click Next and click OK. The queue is created.
Step 4: Creating an Enhanced Datasource Connection
- In the security group of RDS, allow the network segment where the DLI queue is located.
- In the navigation pane on the left, choose Resources > Queue Management and record the network segment of dli_dws.
Figure 1 DLI queue network segment
- Go to the RDS console, choose Instance Management in the navigation pane, and click the name of the created RDS instance.
- Record the value of Private IP Address in the Connection Information area, which will be used in the subsequent connectivity test.
- Click Manage next to the security group in Connection Information.
Figure 2 RDS security group
- In the security group list that is displayed, click the security group name to go to the security group configuration page.
- Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in Step 3: Creating a DLI Queue.
Figure 3 Adding a rule to the RDS security group
- Click OK.
- In the navigation pane on the left, choose Resources > Queue Management and record the network segment of dli_dws.
- Return to the DLI console, click Datasource Connections on the left, select Enhanced, and click Create.
- Set the following parameters. Retain the default values for other parameters that are not described in the table.
Table 4 Connection from DLI to RDS Parameter
Value
Connection Name
dli_rds
Resource Pool
Select the created DLI elastic resource pool.
VPC
Select the VPC where RDS is located.
Subnet
Select the subnet where RDS is located.
Other parameters
Retain the default values.
Figure 4 Creating a datasource connection
- Click OK. Wait until the RDS connection is created.
- Test the connectivity between DLI and RDS.
- Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
- Enter the private IP address of RDS recorded in 1.c and port 3306 in the address box.
- Click Test to verify that DLI is successfully connected to RDS.
Figure 5 Testing the connection between RDS and DLI
- Test the connectivity between DLI and GaussDB(DWS).
- Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters on the left, and click the cluster name to go to the details page.
- As shown in the following figure, record the private IP address and port number of the GaussDB(DWS) cluster for future use.
Figure 6 GaussDB(DWS) internal IP address
- Click the security group name.
Figure 7 GaussDB(DWS) security group
-
Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in 4.
Figure 8 Adding a rule to the GaussDB(DWS) security group
- Click OK.
- Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
- In the address box, enter the private IP address and port number of the GaussDB(DWS) cluster.
- Click Test to verify that DLI is successfully connected to GaussDB(DWS).
Figure 9 Testing GaussDB(DWS) connectivity
Step 5: Creating a DLI Flink Job
- Log in to the OBS console and create an OBS bucket to store Flink jobs. For details, see the OBS User Guide.
Set key parameters as follows and retain the default values for other parameters.
- Region: CN-Hong Kong
- Bucket Name: dli-obs01 (If a conflict occurs, the bucket name can be increased from 02 to 03.)
- Bucket Policy: Private
- Return to the DLI console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
- Set Type to Flink OpenSource SQL and Name to rds-dws.
Figure 10 Creating a job
- Click OK. The page for editing the job is displayed.
- Set the following key parameters on the right of the page. Retain the default values for other parameters that are not described.
- Copy the following SQL code to the SQL code window on the left.
For how to obtain the internal IP address of the RDS database, see 1.c. For details about how to obtain the internal IP address of the GaussDB(DWS) cluster, see 6.b. Change the password of user root of the RDS database and the password of user dbadmin of GaussDB(DWS).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
CREATE TABLE mys_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'Private IP address of the RDS DB instance', 'port' = '3306', 'username' = 'root', 'password' = 'Password of user root of the RDS DB instance', 'database-name' = 'mys_data', 'table-name' = 'mys_order' ); CREATE TABLE dws_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://GaussDB(DWS) cluster private IP address:8000/gaussdb', 'table-name' = 'dws_data.dws_order', 'username' = 'dbadmin', 'password' = 'Password of GaussDB(DWS) user dbadmin', 'write.mode' = 'insert' ); INSERT INTO dws_order SELECT * FROM mys_order;
- Click Format and click Save.
Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.
Figure 11 Flink job parameters
- Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
- Click Start on the right of the job name rds-dws and click Start Now.
Wait for about 1 minute and refresh the page. If the status is Running, the job is executed.
Figure 12 Running succeeded
Step 6: Verifying Data Synchronization
- Go back to the SQL window of the GaussDB(DWS) database. If the connection times out, perform the following operations to log in again:
- Go to the GaussDB(DWS) console.
- In the navigation pane on the left, choose Dedicated Clusters > Clusters, and click Log In on the right of dws-demo.
- Check whether two rows of data in the MySQL table have been synchronized to GaussDB(DWS).
1
SELECT * FROM dws_data.dws_order;
Figure 13 Query result
- Switch to the RDS for MySQL page and run the following statements to insert three new data records:
1 2 3
INSERT INTO mys_data.mys_order VALUES ('202403090003', 'webShop', TIMESTAMP('2024-03-09 13:00:00'), 'CUST1', 2000, 2000); INSERT INTO mys_data.mys_order VALUES ('202403090004', 'webShop', TIMESTAMP('2024-03-09 14:00:00'), 'CUST2', 3000, 3000); INSERT INTO mys_data.mys_order VALUES ('202403100004', 'webShop', TIMESTAMP('2024-03-10 10:00:00'), 'CUST3', 6000, 6000);
Figure 14 New MySQL data
- Go back to the SQL window of GaussDB(DWS) and run the following SQL statement again. The returned result shows that the MySQL data has been synchronized to GaussDB(DWS) in real time.
1
SELECT * FROM dws_data.dws_order;
Figure 15 Real-time data synchronization
More Information
Storing authentication information for a data source directly in the job script for Flink cross-source development can result in password exposure. To enhance security, use DLI's datasource authentication function instead of specifying MySQL and GaussDB(DWS) usernames and passwords directly in job scripts.
Currently, only Flink 1.12 supports this function. Pay attention to the document changes on the official website.
- Log in to the DLI console, click Datasource Connections, and click Datasource Authentication.
- Click Create.
- Create the password authentication for the root user of the MySQL database.
- Set the following parameters:
- Type: Password
- Authentication Certificate: mysql_pwd_auth
- Username: root
- Password: password of user root
Figure 16 MySQL password authentication
- Click OK.
- Set the following parameters:
- Create password authentication for the dbadmin user of GaussDB(DWS).
- Set the following parameters:
- Type: Password
- Authentication Certificate: dws_pwd_auth
- Username: dbadmin
- Password: password of user dbadmin
Figure 17 GaussDB(DWS) password authentication
- Click OK.
- Set the following parameters:
- On the DLI console, choose Job Management > Flink Jobs. Locate the row that contains the job created in Step 5: Creating a DLI Flink Job, and choose More > Stop to stop the job.
- After the job is stopped, you can edit the job name.
- Replace the SQL script with the latest one.
Replace the private IP addresses of RDS and GaussDB(DWS).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
CREATE TABLE mys_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'Private IP address of RDS', 'port' = '3306', 'pwd_auth_name' = 'mysql_pwd_auth', 'database-name' = 'mys_data', 'table-name' = 'mys_order' ); CREATE TABLE dws_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://GaussDB(DWS) private IP address:8000/gaussdb', 'table-name' = 'dws_data.dws_order', 'pwd_auth_name' = 'dws_pwd_auth', 'write.mode' = 'insert' ); INSERT INTO dws_order SELECT * FROM mys_order;
- Click Format and click Save.
- Restart the job and verify data synchronization by referring to Step 6: Verifying Data Synchronization.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot