Halaman ini belum tersedia dalam bahasa lokal Anda. Kami berusaha keras untuk menambahkan lebih banyak versi bahasa. Terima kasih atas dukungan Anda.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ GaussDB(DWS)/ Best Practices/ Data Migration/ Using a Flink Job of DLI to Synchronize MySQL Data to a GaussDB(DWS) Cluster in Real Time

Using a Flink Job of DLI to Synchronize MySQL Data to a GaussDB(DWS) Cluster in Real Time

Updated on 2024-10-29 GMT+08:00

This practice demonstrates how to use a Flink job of DLI to synchronize MySQL data to GaussDB(DWS) in real time.

For details, see What Is Data Lake Insight?

This exercise lasts for approximately 60 minutes and involves utilizing various cloud services such as Virtual Private Cloud (VPC) and Subnet, Relational Database Service (RDS), Data Lake Insight (DLI), Object Storage Service (OBS), and GaussDB(DWS). The following is an outline of the exercise.

  1. Preparations
  2. Step 1: Preparing MySQL Data
  3. Step 2: Creating a GaussDB(DWS) Cluster
  4. Step 3: Creating a DLI Queue
  5. Step 4: Creating an Enhanced Datasource Connection
  6. Step 5: Creating a DLI Flink Job
  7. Step 6: Verifying Data Synchronization
  8. More Information

Preparations

  • You have registered a Huawei ID and enabled Huawei Cloud services.. The account cannot be in arrears or frozen.
  • You have created a VPC and subnet. For details, see Creating a VPC.

Step 1: Preparing MySQL Data

  1. Buy an RDS instance and set the parameters listed in Table 1 (retain the default values for other parameters). For details, see Relational Database Service.

    Table 1 RDS parameters

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    DB Instance Name

    rds-demo

    DB Engine

    MySQL

    DB Engine Version

    5.7 or later

    Database Port

    3306

  2. Connect to the RDS instance and create an instance named mys_data.

    1
    CREATE DATABASE mys_data;
    

  3. Switch to the new database mys_data and run the following command to create the mys_orders table:

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE mys_data.mys_order
         ( order_id      VARCHAR(12),
           order_channel VARCHAR(32),
           order_time    DATETIME,
           cust_code     VARCHAR(6),
           pay_amount    DOUBLE,
           real_pay      DOUBLE,
           PRIMARY KEY (order_id) );
    

  4. insert data to the table.

    1
    2
    INSERT INTO mys_data.mys_order VALUES ('202306270001', 'webShop', TIMESTAMP('2023-06-27 10:00:00'), 'CUST1', 1000, 1000);
    INSERT INTO mys_data.mys_order VALUES ('202306270002', 'webShop', TIMESTAMP('2023-06-27 11:00:00'), 'CUST2', 5000, 5000);
    

  5. Check whether the data is inserted.

    1
    SELECT * FROM mys_data.mys_order;
    

Step 2: Creating a GaussDB(DWS) Cluster

  1. Creating a Cluster. To ensure network connectivity, select the same region and VPC as those of the RDS instance. In this practice, select China-Hong Kong. The VPC must be the same as that created for RDS.
  2. Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters, locate the row that contains the target cluster, and click Login in the Operation column. The login information is as follows:

    • Cluster: the created GaussDB(DWS) cluster.
    • Database: gaussdb
    • Data source name: dws-demo-01
    • Username: dbadmin
    • Password: password set when the GaussDB(DWS) cluster is created

  3. Select Remember Password, click Test Connection, and wait until the connection is successful.
  4. Copy the following SQL statements. In the SQL window, click Execute SQL to create a schema named dws_data.

    1
    CREATE SCHEMA dws_data;
    

  1. Create the dws_order table in the new schema.

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE dws_data.dws_order
            ( order_id      VARCHAR(12),
              order_channel VARCHAR(32),
              order_time    TIMESTAMP,
              cust_code     VARCHAR(6),
              pay_amount    DOUBLE PRECISION,
              real_pay      DOUBLE PRECISION );
    

  1. Query data. The current table is empty.

    1
    SELECT * FROM dws_data.dws_order;
    

Step 3: Creating a DLI Queue

  1. Log in to the Huawei Cloud console and choose Analytics > Data Lake Insight from the service list. The DLI console is displayed.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. Click Buy Resource Pool in the upper right corner, set the following parameters, and retain the default values for other parameters that are not described in the table.

    Table 2 DLI elastic resource pool

    Parameter

    Value

    Billing Mode

    Pay-per-use

    Region

    CN-Hong Kong

    Name

    dli_dws

    Specifications

    Standard

    CIDR Block

    172.16.0.0/18, which must be in a different network segment from MySQL and GaussDB(DWS). For example, if MySQL and GaussDB(DWS) are in the 192.168.x.x network segment, select 172.16.x.x for DLI.

  4. Click Buy and click Submit.

    After the resource pool is created, go to the next step.

  5. On the elastic resource pool page, locate the row that contains the created resource pool, click Add Queue in the Operation column, and set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 3 Adding a queue

    Parameter

    Value

    Name

    dli_dws

    Type

    General purpose queue

  6. Click Next and click OK. The queue is created.

Step 4: Creating an Enhanced Datasource Connection

  1. In the security group of RDS, allow the network segment where the DLI queue is located.

    1. In the navigation pane on the left, choose Resources > Queue Management and record the network segment of dli_dws.
      Figure 1 DLI queue network segment
    2. Go to the RDS console, choose Instance Management in the navigation pane, and click the name of the created RDS instance.
    3. Record the value of Private IP Address in the Connection Information area, which will be used in the subsequent connectivity test.
    4. Click Manage next to the security group in Connection Information.
      Figure 2 RDS security group

    5. In the security group list that is displayed, click the security group name to go to the security group configuration page.
    6. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in Step 3: Creating a DLI Queue.
      Figure 3 Adding a rule to the RDS security group

    7. Click OK.

  2. Return to the DLI console, click Datasource Connections on the left, select Enhanced, and click Create.
  3. Set the following parameters. Retain the default values for other parameters that are not described in the table.

    Table 4 Connection from DLI to RDS

    Parameter

    Value

    Connection Name

    dli_rds

    Resource Pool

    Select the created DLI elastic resource pool.

    VPC

    Select the VPC where RDS is located.

    Subnet

    Select the subnet where RDS is located.

    Other parameters

    Retain the default values.

    Figure 4 Creating a datasource connection

  4. Click OK. Wait until the RDS connection is created.
  5. Test the connectivity between DLI and RDS.

    1. Choose Resources > Queue Management on the left, and choose More > Test Address Connectivity on the right of dli_dws.
    2. Enter the private IP address of RDS recorded in 1.c and port 3306 in the address box.
    3. Click Test to verify that DLI is successfully connected to RDS.
      Figure 5 Testing the connection between RDS and DLI

  6. Test the connectivity between DLI and GaussDB(DWS).

    1. Log in to the GaussDB(DWS) console, choose Dedicated Clusters > Clusters on the left, and click the cluster name to go to the details page.
    2. As shown in the following figure, record the private IP address and port number of the GaussDB(DWS) cluster for future use.
      Figure 6 GaussDB(DWS) internal IP address
    3. Click the security group name.
      Figure 7 GaussDB(DWS) security group
    4. Choose Inbound Rules > Add Rule, as shown in the following figure. Add the network segment of the DLI queue. In this example, the network segment is 172.16.0.0/18. Ensure that the network segment is the same as that entered in 4.

      Figure 8 Adding a rule to the GaussDB(DWS) security group
    5. Click OK.
    6. Switch to the DLI console, choose Resources > Queue Management on the left, and click More > Test Address Connectivity on the right of dli_dws.
    7. In the address box, enter the private IP address and port number of the GaussDB(DWS) cluster.
    8. Click Test to verify that DLI is successfully connected to GaussDB(DWS).
      Figure 9 Testing GaussDB(DWS) connectivity

Step 5: Creating a DLI Flink Job

  1. Log in to the OBS console and create an OBS bucket to store Flink jobs. For details, see the OBS User Guide.

    Set key parameters as follows and retain the default values for other parameters.

    • Region: CN-Hong Kong
    • Bucket Name: dli-obs01 (If a conflict occurs, the bucket name can be increased from 02 to 03.)
    • Bucket Policy: Private

  2. Return to the DLI console, choose Job Management > Flink Jobs on the left, and click Create Job in the upper right corner.
  3. Set Type to Flink OpenSource SQL and Name to rds-dws.

    Figure 10 Creating a job

  4. Click OK. The page for editing the job is displayed.
  5. Set the following key parameters on the right of the page. Retain the default values for other parameters that are not described.

    • Queue: Select dli_dws obtained in 4.
    • Flink Version: Select version 1.15 or later. (The actual version is subject to the GUI.)
    • OBS Bucket: Select the bucket created in 1 and click Authorize.
    • (Optional) Select Save Job Log.

  6. Copy the following SQL code to the SQL code window on the left.

    For how to obtain the internal IP address of the RDS database, see 1.c. For details about how to obtain the internal IP address of the GaussDB(DWS) cluster, see 6.b. Change the password of user root of the RDS database and the password of user dbadmin of GaussDB(DWS).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    CREATE TABLE
      mys_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'mysql-cdc',
        'hostname' = 'Private IP address of the RDS DB instance',
        'port' = '3306',
        'username' = 'root',
        'password' = 'Password of user root of the RDS DB instance',
        'database-name' = 'mys_data',
        'table-name' = 'mys_order'
      );
    
    CREATE TABLE
      dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://GaussDB(DWS) cluster private IP address:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'username' = 'dbadmin',
        'password' = 'Password of GaussDB(DWS) user dbadmin',
        'write.mode' = 'insert'
      );
    
    INSERT INTO
      dws_order
    SELECT
      *
    FROM
      mys_order;
    

  7. Click Format and click Save.

    NOTICE:

    Click Format to format the SQL code. Otherwise, new null characters may be introduced during code copy and paste, causing job execution failures.

    Figure 11 Flink job parameters

  1. Return to the DLI console home page and choose Job Management > Flink Jobs on the left.
  2. Click Start on the right of the job name rds-dws and click Start Now.

    Wait for about 1 minute and refresh the page. If the status is Running, the job is executed.

    Figure 12 Running succeeded

Step 6: Verifying Data Synchronization

  1. Go back to the SQL window of the GaussDB(DWS) database. If the connection times out, perform the following operations to log in again:

    1. Go to the GaussDB(DWS) console.
    2. In the navigation pane on the left, choose Dedicated Clusters > Clusters, and click Log In on the right of dws-demo.

  2. Check whether two rows of data in the MySQL table have been synchronized to GaussDB(DWS).

    1
    SELECT * FROM dws_data.dws_order;
    
    Figure 13 Query result

  3. Switch to the RDS for MySQL page and run the following statements to insert three new data records:

    1
    2
    3
    INSERT INTO mys_data.mys_order VALUES ('202403090003', 'webShop', TIMESTAMP('2024-03-09 13:00:00'), 'CUST1', 2000, 2000);
    INSERT INTO mys_data.mys_order VALUES ('202403090004', 'webShop', TIMESTAMP('2024-03-09 14:00:00'), 'CUST2', 3000, 3000);
    INSERT INTO mys_data.mys_order VALUES ('202403100004', 'webShop', TIMESTAMP('2024-03-10 10:00:00'), 'CUST3', 6000, 6000);
    
    Figure 14 New MySQL data

  4. Go back to the SQL window of GaussDB(DWS) and run the following SQL statement again. The returned result shows that the MySQL data has been synchronized to GaussDB(DWS) in real time.

    1
    SELECT * FROM dws_data.dws_order;
    
    Figure 15 Real-time data synchronization

More Information

Storing authentication information for a data source directly in the job script for Flink cross-source development can result in password exposure. To enhance security, use DLI's datasource authentication function instead of specifying MySQL and GaussDB(DWS) usernames and passwords directly in job scripts.

NOTE:

Currently, only Flink 1.12 supports this function. Pay attention to the document changes on the official website.

  1. Log in to the DLI console, click Datasource Connections, and click Datasource Authentication.
  2. Click Create.
  3. Create the password authentication for the root user of the MySQL database.

    1. Set the following parameters:
      • Type: Password
      • Authentication Certificate: mysql_pwd_auth
      • Username: root
      • Password: password of user root
      Figure 16 MySQL password authentication
    2. Click OK.

  4. Create password authentication for the dbadmin user of GaussDB(DWS).

    1. Set the following parameters:
      • Type: Password
      • Authentication Certificate: dws_pwd_auth
      • Username: dbadmin
      • Password: password of user dbadmin
      Figure 17 GaussDB(DWS) password authentication
    2. Click OK.

  5. On the DLI console, choose Job Management > Flink Jobs. Locate the row that contains the job created in Step 5: Creating a DLI Flink Job, and choose More > Stop to stop the job.
  6. After the job is stopped, you can edit the job name.
  7. Replace the SQL script with the latest one.

    Replace the private IP addresses of RDS and GaussDB(DWS).
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    CREATE TABLE mys_order (
      order_id STRING,
      order_channel STRING,
      order_time TIMESTAMP,
      cust_code STRING,
      pay_amount DOUBLE,
      real_pay DOUBLE,
      PRIMARY KEY (order_id) NOT ENFORCED ) 
    WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'Private IP address of RDS',
      'port' = '3306',
      'pwd_auth_name' = 'mysql_pwd_auth',
      'database-name' = 'mys_data',
      'table-name' = 'mys_order' );
     
    CREATE TABLE dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE, 
        PRIMARY KEY (order_id) NOT ENFORCED )
    WITH (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://GaussDB(DWS) private IP address:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'pwd_auth_name' = 'dws_pwd_auth',
        'write.mode' = 'insert' );
     
    INSERT INTO dws_order SELECT * FROM mys_order;
    

  8. Click Format and click Save.
  9. Restart the job and verify data synchronization by referring to Step 6: Verifying Data Synchronization.

Kami menggunakan cookie untuk meningkatkan kualitas situs kami dan pengalaman Anda. Dengan melanjutkan penelusuran di situs kami berarti Anda menerima kebijakan cookie kami. Cari tahu selengkapnya

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback