Help Center/ Data Lake Insight/ Getting Started/ Using DLI to Submit a Flink OpenSource SQL Job to Query RDS for MySQL Data
Updated on 2024-08-16 GMT+08:00

Using DLI to Submit a Flink OpenSource SQL Job to Query RDS for MySQL Data

Scenario

DLI Flink jobs can use other cloud services as data sources and sink streams for real-time compute.

This example describes how to create and submit a Flink OpenSource SQL job that uses Kafka as the source stream and RDS as the sink stream.

Procedure

You need to create a Flink OpenSource SQL job that has an source stream and an sink stream. The source stream reads data from Kafka, and the sink stream writes data to RDS for MySQL. Procedure shows the process.

Complete the preparations in Preparations before performing the following operations.

Table 1 Procedure for using DLI to submit a Flink OpenSource SQL job to query RDS for MySQL data

Procedure

Description

Step 1: Prepare a Source Stream

In this example, a Kafka instance is created as the data source.

Step 2: Prepare a Sink Stream

In this example, an RDS for MySQL DB instance is created as the data destination.

Step 3: Create an OBS Bucket to Store Output Data

Create an OBS bucket to store checkpoints, job logs, and debugging test data for the Flink OpenSource SQL job.

Step 4: Create an Elastic Resource Pool and Add Queues to the Pool

Create compute resources required for submitting the Flink OpenSource SQL job.

Step 5: Create an Enhanced Datasource Connection Between DLI and Kafka

Create an enhanced datasource connection to connect the DLI elastic resource pool and the Kafka instance.

Step 6: Create an Enhanced Datasource Connection Between DLI and RDS

Create an enhanced datasource connection to connect the DLI elastic resource pool and the RDS for MySQL DB instance.

Step 7: Use DEW to Manage Access Credentials and Create an Agency That Allows DLI to Access DEW

In cross-source analysis scenarios, use DEW to manage access credentials of data sources and create an agency that allows DLI to access DEW.

Step 8: Create a Flink OpenSource SQL Job

Once you have prepared a source stream and a sink stream, you can create a Flink OpenSource SQL job to analyze the data.

Preparations

  • Register a Huawei ID and enable Huawei Cloud services. Make sure your account is not in arrears or frozen.
  • Configure an agency for DLI.
    To use DLI, you need to access services such as Object Storage Service (OBS), Virtual Private Cloud (VPC), and Simple Message Notification (SMN). If it is your first time using DLI, you will need to configure an agency to allow access to these dependent services.
    1. Log in to the DLI management console using your account. In the navigation pane on the left, choose Global Configuration > Service Authorization.
    2. On the agency settings page, select the agency permissions under Basic Usage, Datasource, and O&M and click Update.
    3. Check and understand the notes for updating the agency, and click OK. The DLI agency permissions are updated.
      Figure 1 Configuring an agency for DLI
    4. Once configured, you can check the agency dli_management_agency in the agency list on the IAM console.

Step 1: Prepare a Source Stream

In this example, Kafka is the source stream.

For more information about Flink OpenSource SQL job data, refer to Preparing Flink Job Data .

Enable DIS to import Kafka data to DLI. For details, see "Buying a Kafka Instance" in the Distributed Message Service Kafka User Guide.

  1. Create the dependent Kafka resources.
    Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
    • For details about how to create a VPC and subnet, see "Creating a VPC and Subnet" in Virtual Private Cloud User Guide. For details about how to create and use a subnet in an existing VPC, see "Create a Subnet for the VPC" in Virtual Private Cloud User Guide.
      • The created VPC and the Kafka instance you will create must be in the same region.
      • Retain the default settings unless otherwise specified.
    • For details about how to create a security group, see "Creating a Security Group" in the Virtual Private Cloud User Guide. For details about how to add rules to a security group, see "Creating a Subnet for the VPC" in the Virtual Private Cloud User Guide.

    For more information, see in Distributed Message Service for Kafka User Guide.

  2. Create a Kafka premium instance as the job source stream.
    1. Log in to the DMS for Kafka management console.
    2. Select a region in the upper left corner.
    3. On the DMS for Kafka page, click Buy Instance in the upper right corner and set related parameters. The required instance information is as follows:
      • Region: Select the region where DLI is located.
      • Project: Keep the default value.
      • AZ: Keep the default value.
      • Instance Name: kafka-dliflink
      • Specifications: Default
      • Enterprise Project: default
      • Version: Keep the default value.
      • CPU Architecture: Keep the default value.
      • Broker Flavor: Select a flavor as needed.
      • Brokers: Retain the default value.
      • Storage Space: Keep the default value.
      • Capacity Threshold Policy: Keep the default value.
      • VPC and Subnet: Select the VPC and subnet created in 1.
      • Security Group: Select the security group created in 1.
      • Manager Username: Enter dliflink (used to log in to the instance management page).
      • Password: **** (The system cannot detect your password.)
      • Confirm Password: ****
      • More Settings: Do not configure this parameter.
    4. Click Buy. The confirmation page is displayed.
    5. Confirm that the instance information is correct, read and agree to the HUAWEI CLOUD Customer Agreement, and click Submit. It takes about 10 to 15 minutes to create an instance.
  3. Create a Kafka topic.
    1. Click the name of the created Kafka instance. The basic information page of the instance is displayed.
    2. Choose Topics in the navigation pane on the left. On the displayed page, click Create Topic. Configure the following parameters:
      • Topic Name: For this example, enter testkafkatopic.
      • Partitions: Set the value to 1.
      • Replicas: Set the value to 1.

      Retain the default values for other parameters.

Step 2: Prepare a Sink Stream

To use RDS as the sink stream, create an RDS MySQL instance. For details, see "Getting Started with RDS for MySQL" in Getting Started with Relational Database ServiceGetting Started with Relational Database Service.

  1. Log in to the RDS management console.
  2. Select a region in the upper left corner.
  3. Click Buy DB Instance in the upper right corner of the page and set related parameters. Retain the default values for other parameters.
    • Billing Mode: Select Pay-per-use.
    • Region: Select the region where DLI is located.
    • DB Instance Name: Enter rds-dliflink.
    • DB Engine: Select MySQL.
    • DB Engine Version: Select 8.0.
    • DB Instance Type: Select Primary/Standby.
    • Storage Type: Cloud SSD may be selected by default.
    • Primary AZ: Select a custom AZ.
    • Standby AZ: Select a custom AZ.
    • Time Zone: Select your current time zone.
    • Instance Class: Select a class as needed and choose 2 vCPUs | 8 GB.
    • Storage Space (GB): Set it to 40.
    • VPC: Select the VPC and subnet created in 1.
    • Database Port: Enter 3306.
    • Security Group: Select the security group created in 1.
    • Administrator Password: **** (Keep the password secure. The system cannot retrieve your password.)
    • Confirm Password: ****
    • Parameter Template: Choose Default-MySQL-8.0.
  4. Click Next and confirm the specifications.
  5. Click Submit. The RDS DB instance is created.
  6. Log in to the MySQL database and create table orders in database flink.
    Log in to the MySQL instance, click the flink database. On the displayed page, click SQL Window. Enter the following table creation statement in the SQL editing pane to create a table.
    CREATE TABLE `flink`.`orders` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE UNSIGNED NOT NULL,
    	`real_pay` DOUBLE UNSIGNED NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;

Step 3: Create an OBS Bucket to Store Output Data

In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpointing, saving job logs, and commissioning test data.

For details about how to create a bucket, see "Creating a Bucket" in the Object Storage Service Console Operation Guide.

  1. In the navigation pane on the OBS management console, choose Object Storage.
  2. In the upper right corner of the page, click Create Bucket and set bucket parameters.
    • Region: Select the region where DLI is located.
    • Bucket Name: Enter a bucket name. For this example, enter obstest.
    • Default Storage Class: Standard
    • Bucket Policy: Private
    • Default Encryption: Do not enable
    • Direct Reading: Do not enable
    • Enterprise Project: default
  3. Click Create Now.

Step 4: Create an Elastic Resource Pool and Add Queues to the Pool

To create a Flink OpenSource SQL job, you must use your own queue as the existing default queue cannot be used. In this example, create an elastic resource pool named dli_resource_pool and a queue named dli_queue_01.

  1. Log in to the DLI management console.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. On the displayed page, click Buy Resource Pool in the upper right corner.
  4. On the displayed page, set the parameters.
  5. In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 2 describes the parameters.
    Table 2 Parameters

    Parameter

    Description

    Example Value

    Region

    Select a region where you want to buy the elastic resource pool.

    CN East-Shanghai2

    Project

    Project uniquely preset by the system for each region

    Default

    Name

    Name of the elastic resource pool

    dli_resource_pool

    Specifications

    Specifications of the elastic resource pool

    Standard

    CU Range

    The maximum and minimum CUs allowed for the elastic resource pool

    64-64

    CIDR Block

    CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.

    172.16.0.0/19

    Enterprise Project

    Select an enterprise project for the elastic resource pool.

    default

  6. Click Buy.
  7. Click Submit.
  8. In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
  9. Set the basic parameters listed below.
    Table 3 Basic parameters for adding a queue

    Parameter

    Description

    Example Value

    Name

    Name of the queue to add

    dli_queue_01

    Type

    Type of the queue

    • To execute SQL jobs, select For SQL.
    • To execute Flink or Spark jobs, select For general purpose.

    _

    Engine

    SQL queue engine. The options include Spark and Trino.

    _

    Enterprise Project

    Select an enterprise project.

    default

  10. Click Next and configure scaling policies for the queue.

    Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.

    Figure 2 shows the scaling policy configured in this example.
    Figure 2 Configuring a scaling policy when adding a queue
    Table 4 Scaling policy parameters

    Parameter

    Description

    Example Value

    Priority

    Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.

    1

    Period

    The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.

    The period for the scaling policy is from 00 to 24.

    00–24

    Min CU

    Minimum number of CUs allowed by the scaling policy

    16

    Max CU

    Maximum number of CUs allowed by the scaling policy

    64

  11. Click OK.

Step 5: Create an Enhanced Datasource Connection Between DLI and Kafka

You need to create an enhanced datasource connection for the Flink OpenSource SQL job. For details, see "Creating an Enhanced Datasource Connection".

  • Enhanced datasource connections support only pay-per-use queues.
  • The CIDR block of the DLI queue bound with a datasource connection cannot overlap with the CIDR block of the data source.
  • Datasource connections cannot be created for the default queue.
  • To access a table across data sources, you need to use a queue bound to a datasource connection.
  1. Create a Kafka security group rule to allow access from the CIDR block of the DLI queue.

    1. On the Kafka management console, click an instance name on the DMS for Kafka page. Basic information of the Kafka instance is displayed.
    2. In the Connection pane, obtain the Instance Address (Private Network). In the Network pane, obtain the VPC and subnet of the instance.
    3. Click the security group name in the Network pane. On the displayed page, click the Inbound Rules tab and add a rule to allow access from the DLI queue.

      For example, if the CIDR block of the queue is 10.0.0.0/16, set Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.

  2. Create an enhanced datasource connection to Kafka.

    1. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
    2. In the displayed dialog box, set the following parameters: For details, see the following section:
      • Connection Name: Name of the enhanced datasource connection For this example, enter dli_kafka.
      • Resource Pool: Select the elastic resource pool created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool.
      • VPC: Select the VPC of the Kafka instance.
      • Subnet: Select the subnet of Kafka instance.
      • Set other parameters as you need.

      Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

    3. Choose Resources > Queue Management and locate the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool. In the Operation column, click More and select Test Address Connectivity.
    4. In the displayed dialog box, enter Kafka instance address (private network):port in the Address box and click Test to check whether the instance is reachable. Note that multiple addresses must be tested separately.

Step 6: Create an Enhanced Datasource Connection Between DLI and RDS

  1. Create an RDS security group rule to allow access from CIDR block of the DLI queue.

    If the RDS DB instance and Kafka instance are in the same security group of the same VPC, skip this step. Access from the DLI queue has been allowed in 1.
    1. Go to the RDS console, click the name of the target RDS for MySQL DB instance on the Instances page. Basic information of the instance is displayed.
    2. In the Connection Information pane, obtain the floating IP address, database port, VPC, and subnet.
    3. Click the security group name. On the displayed page, click the Inbound Rules tab and add a rule to allow access from the DLI queue. For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.

  2. Create an enhanced datasource connection to RDS.

    If the RDS DB instance and Kafka instance are in the same VPC and subnet, skip this step. The enhanced datasource connection created in 2 has connected the subnet.

    If the two instances are in different VPCs or subnets, perform the following steps to create an enhanced datasource connection:
    1. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
    2. In the displayed dialog box, set the following parameters: For details, see the following section:
      • Connection Name: Name of the enhanced datasource connection For this example, enter dli_rds.
      • Resource Pool: Select the name of the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool.
      • VPC: Select the VPC of the RDS DB instance.
      • Subnet: Select the subnet of RDS DB instance.
      • Set other parameters as you need.

      Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

    3. Choose Resources > Queue Management and locate the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool. In the Operation column, click More and select Test Address Connectivity.
    4. In the displayed dialog box, enter Floating IP address:Database port of the RDS for MySQL DB instance in the Address box and click Test to check if the instance is reachable.

Step 7: Use DEW to Manage Access Credentials and Create an Agency That Allows DLI to Access DEW

In cross-source analysis scenarios, you need to set attributes such as the username and password in the connector. However, information such as usernames and passwords is highly sensitive and needs to be encrypted to ensure user data privacy. Flink 1.15 allows for the use of DEW to manage credentials. Before running a job, create a custom agency and configure agency information within the job.

Data Encryption Workshop (DEW) and Cloud Secret Management Service (CSMS) joint form a secure, reliable, and easy-to-use privacy data encryption and decryption solution. This example describes how a Flink OpenSource SQL job uses DEW to manage RDS access credentials.

  1. Create an agency for DLI to access DEW and complete authorization.
  2. Create a shared secret in DEW.
  3. Log in to the DEW management console.
  4. In the navigation pane on the left, choose Cloud Secret Management Service > Secrets.
  5. Click Create Secret. On the displayed page, configure basic secret information.
    • Secret Name: Enter a secret name. In this example, the name is secretInfo.
    • Secret Value: Enter the username and password for logging in to the RDS for MySQL DB instance.
      • The key in the first line is MySQLUsername, and the value is the username for logging in to the DB instance.
      • The key in the second line is MySQLPassword, and the value is the password for logging in to the DB instance.
      Figure 3 Secret Value
  6. Set other parameters as required and click OK.

Step 8: Create a Flink OpenSource SQL Job

After the source and sink streams are prepared, you can create a Flink OpenSource SQL job.

  1. In the left navigation pane of the DLI management console, choose Job Management > Flink Jobs. The Flink Jobs page is displayed.
  2. In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
    • Type: Flink OpenSource SQL
    • Name: JobSample
    • Description: Leave it blank.
    • Template Name: Do not select any template.
    • Tags: Leave it blank.
  3. Click OK to enter the editing page.
  4. Set job running parameters. The mandatory parameters are as follows:
    • Queue: dli_queue_01
    • Flink Version: Select 1.12.
    • Save Job Log: Enable this function.
    • OBS Bucket: Select an OBS bucket for storing job logs and grant access permissions of the OBS bucket as prompted.
    • Enable Checkpointing: Enable this function.

    You do not need to set other parameters.

  5. Click Save.
  6. Edit the Flink OpenSource SQL job.
    In the SQL statement editing area, enter query and analysis statements as you need. The example statements are as follows. Note that the values of the parameters in bold must be changed according to the comments.
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'testkafkatopic',// Topic to be written to Kafka. Log in to the Kafka console, click the name of the created Kafka instance, and view the topic name on the Topic Management page.
      'properties.bootstrap.servers' = "192.168.0.237:9092,192.168.0.252:9092,192.168.0.137:9092", // Replace it with the internal network address and port number of Kafka.
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE jdbcSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'jdbc',
      'url' = "jdbc:mysql://172.16.0.116:3306/rds-dliflink", //  testrdsdb indicates the name of the created RDS database. Replace the IP address and port number with those of the RDS for MySQL instance.
      'table-name' = 'orders',
      'pwd_auth_name'="xxxxx", // Name of the datasource authentication of the password type created on DLI. If datasource authentication is used, you do not need to set the username and password for the job.
      'sink.buffer-flush.max-rows' = '1'
    );
    
    insert into jdbcSink select * from kafkaSource;
  7. Click Check Semantics.
  8. Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.

    After the job is started, the system automatically switches to the Flink Jobs page, and the created job is displayed in the job list. You can view the job status in the Status column. After a job is successfully submitted, Status of the job will change from Submitting to Running.

    If Status of a job is Submission failed or Running exception, the job fails to be submitted or fails to run. In this case, you can hover over the status icon to view the error details. You can click to copy these details. Rectify the fault based on the error information and resubmit the job.

  9. Connect to the Kafka cluster and send the following test data to the Kafka topics:

    For details about how Kafka creates and retrieves data, visit Connecting to an Instance Without SASL.

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  10. Run the following SQL statement in the MySQL database to view data in the table:
    select * from orders;
    The following is an example of the execution result copied from the MySQL database:
    202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106
    202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106