Using DLI to Submit a Flink OpenSource SQL Job to Query RDS for MySQL Data
Scenario
DLI Flink jobs can use other cloud services as data sources and sink streams for real-time compute.
This example describes how to create and submit a Flink OpenSource SQL job that uses Kafka as the source stream and RDS as the sink stream.
Procedure
You need to create a Flink OpenSource SQL job that has a source stream and a sink stream. The source stream reads data from Kafka, and the sink stream writes data to RDS for MySQL. Procedure shows the process.
Complete the preparations in Preparations before performing the following operations.
Procedure |
Description |
---|---|
In this example, a Kafka instance is created as the data source. |
|
In this example, an RDS for MySQL DB instance is created as the data destination. |
|
Create an OBS bucket to store checkpoints, job logs, and debugging test data for the Flink OpenSource SQL job. |
|
Step 4: Create an Elastic Resource Pool and Add Queues to the Pool |
Create compute resources required for submitting the Flink OpenSource SQL job. |
Step 5: Create an Enhanced Datasource Connection Between DLI and Kafka |
Create an enhanced datasource connection to connect the DLI elastic resource pool and the Kafka instance. |
Step 6: Create an Enhanced Datasource Connection Between DLI and RDS |
Create an enhanced datasource connection to connect the DLI elastic resource pool and the RDS for MySQL DB instance. |
Step 7: Use DEW to Manage Access Credentials and Create an Agency That Allows DLI to Access DEW |
In cross-source analysis scenarios, use DEW to manage access credentials of data sources and create an agency that allows DLI to access DEW. |
Once you have prepared a source stream and a sink stream, you can create a Flink OpenSource SQL job to analyze the data. |
Preparations
- Register a Huawei ID and enable Huawei Cloud services. Make sure your account is not in arrears or frozen.
- Configure an agency for DLI.
To use DLI, you need to access services such as Object Storage Service (OBS), Virtual Private Cloud (VPC), and Simple Message Notification (SMN). If it is your first time using DLI, you will need to configure an agency to allow access to these dependent services.
- Log in to the DLI management console using your account. In the navigation pane on the left, choose Global Configuration > Service Authorization.
- On the agency settings page, select the agency permissions under Basic Usage, Datasource, and O&M and click Update.
- View and understand the notes for updating the agency and click OK. The DLI agency permissions are updated.
Figure 1 Configuring an agency for DLI
- Once configured, you can check the agency dli_management_agency in the agency list on the IAM console.
Step 1: Prepare a Source Stream
In this example, Kafka is the source stream.
Enable DIS to import Kafka data to DLI. For details, see Buying an Instance.
- Create the dependent Kafka resources.
Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
- For how to create a VPC and subnet, see Creating a VPC and Subnet. For how to create and use a subnet in an existing VPC, see Creating a Subnet for the VPC.
- The created VPC and the Kafka instance you will create must be in the same region.
- Retain the default settings unless otherwise specified.
- For how to create a security group, see Creating a Security Group. For how to add rules to a security group, see Creating a Subnet for the VPC.
For details, see Kafka Network Connection Conditions.
- For how to create a VPC and subnet, see Creating a VPC and Subnet. For how to create and use a subnet in an existing VPC, see Creating a Subnet for the VPC.
- Create a Kafka premium instance as the job source stream.
- Log in to the DMS for Kafka management console.
- Select a region in the upper left corner.
- On the DMS for Kafka page, click Buy Instance in the upper right corner and set related parameters. The required instance information is as follows:
- Region: Select the region where DLI is located.
- Project: Keep the default value.
- AZ: Keep the default value.
- Instance Name: kafka-dliflink
- Specifications: Default
- Enterprise Project: default
- Version: Keep the default value.
- CPU Architecture: Keep the default value.
- Broker Flavor: Select a flavor as needed.
- Brokers: Retain the default value.
- Storage Space: Keep the default value.
- Capacity Threshold Policy: Keep the default value.
- VPC and Subnet: Select the VPC and subnet created in 1.
- Security Group: Select the security group created in 1.
- Manager Username: Enter dliflink (used to log in to the instance management page).
- Password: **** (The system cannot detect your password.)
- Confirm Password: ****
- More Settings: Do not configure this parameter.
- Click Buy. The confirmation page is displayed.
- Confirm that the instance information is correct, read and agree to the HUAWEI CLOUD Customer Agreement, and click Submit. It takes about 10 to 15 minutes to create an instance.
- Create a Kafka topic.
- Click the name of the created Kafka instance. The basic information page of the instance is displayed.
- Choose Topics in the navigation pane on the left. On the displayed page, click Create Topic. Configure the following parameters:
- Topic Name: For this example, enter testkafkatopic.
- Partitions: Set the value to 1.
- Replicas: Set the value to 1.
Retain the default values for other parameters.
Step 2: Prepare a Sink Stream
To use RDS as the sink stream, create an RDS MySQL instance. For details, see "RDS for MySQL Getting Started" in the Relational Database Service Getting Started.
- Log in to the RDS management console.
- Select a region in the upper left corner.
- Click Buy DB Instance in the upper right corner of the page and set related parameters. Retain the default values for other parameters.
- Billing Mode: Select Pay-per-use.
- Region: Select the region where DLI is located.
- DB Instance Name: Enter rds-dliflink.
- DB Engine: Select MySQL.
- DB Engine Version: Select 8.0.
- DB Instance Type: Select Primary/Standby.
- Storage Type: Cloud SSD may be selected by default.
- Primary AZ: Select a custom AZ.
- Standby AZ: Select a custom AZ.
- Time Zone: Keep the default value.
- Instance Class: Select a class as needed and choose 2 vCPUs | 8 GB.
- Storage Space (GB): Set it to 40.
- VPC: Select the VPC and subnet created in 1.
- Database Port: Enter 3306.
- Security Group: Select the security group created in 1.
- Administrator Password: **** (Keep the password secure. The system cannot retrieve your password.)
- Confirm Password: ****
- Parameter Template: Choose Default-MySQL-8.0.
- Read Replica: Select Skip.
- Click Next and confirm the specifications.
- Click Submit. The RDS DB instance is created.
- Log in to the MySQL database and create table orders in database flink.
Log in to the MySQL instance, click the flink database. On the displayed page, click SQL Window. Enter the following table creation statement in the SQL editing pane to create a table.
CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE UNSIGNED NOT NULL, `real_pay` DOUBLE UNSIGNED NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
Step 3: Create an OBS Bucket to Store Output Data
In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpointing, saving job logs, and commissioning test data.
For how to create a bucket, see Creating a Bucket.
- In the navigation pane on the OBS management console, choose Object Storage.
- In the upper right corner of the page, click Create Bucket and set bucket parameters.
- Region: Select the region where DLI is located.
- Bucket Name: Enter a bucket name. For this example, enter obstest.
- Default Storage Class: Standard
- Bucket Policy: Private
- Default Encryption: Do not enable
- Direct Reading: Do not enable
- Enterprise Project: default
- Click Create Now.
Step 4: Create an Elastic Resource Pool and Add Queues to the Pool
To create a Flink OpenSource SQL job, you must use your own queue as the existing default queue cannot be used. In this example, create an elastic resource pool named dli_resource_pool and a queue named dli_queue_01.
- Log in to the DLI management console.
- In the navigation pane on the left, choose Resources > Resource Pool.
- On the displayed page, click Buy Resource Pool in the upper right corner.
- On the displayed page, set the parameters.
- In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 2 describes the parameters.
Table 2 Parameters Parameter
Description
Example Value
Region
Select a region where you want to buy the elastic resource pool.
CN East-Shanghai2
Project
Project uniquely preset by the system for each region
Default
Name
Name of the elastic resource pool
dli_resource_pool
Specifications
Specifications of the elastic resource pool
Standard
CU Range
The maximum and minimum CUs allowed for the elastic resource pool
64-64
CIDR Block
CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.
172.16.0.0/19
Enterprise Project
Select an enterprise project for the elastic resource pool.
default
- Click Buy.
- Click Submit.
- In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
- Set the basic parameters listed below.
Table 3 Basic parameters for adding a queue Parameter
Description
Example Value
Name
Name of the queue to add
dli_queue_01
Type
Type of the queue
- To execute SQL jobs, select For SQL.
- To execute Flink or Spark jobs, select For general purpose.
_
Engine
SQL queue engine. The options include Spark and Trino.
_
Enterprise Project
Select an enterprise project.
default
- Click Next and configure scaling policies for the queue.
Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.
Figure 2 shows the scaling policy configured in this example.Table 4 Scaling policy parameters Parameter
Description
Example Value
Priority
Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.
1
Period
The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.
The period for the scaling policy is from 00 to 24.
00–24
Min CU
Minimum number of CUs allowed by the scaling policy
16
Max CU
Maximum number of CUs allowed by the scaling policy
64
- Click OK.
Step 5: Create an Enhanced Datasource Connection Between DLI and Kafka
You need to create an enhanced datasource connection for the Flink OpenSource SQL job. For details, see "Datasource Connections" > "Creating an Enhanced Datasource Connection" in the Data Lake Insight User Guide.
- Enhanced datasource connections support only pay-per-use queues.
- The CIDR block of the DLI queue bound with a datasource connection cannot overlap with the CIDR block of the data source.
- Datasource connections cannot be created for the default queue.
- To access a table across data sources, you need to use a queue bound to a datasource connection.
- Create a Kafka security group rule to allow access from the CIDR block of the DLI queue.
- On the Kafka management console, click an instance name on the DMS for Kafka page. Basic information of the Kafka instance is displayed.
- In the Connection pane, obtain the Instance Address (Private Network). In the Network pane, obtain the VPC and subnet of the instance.
- Click the security group name in the Network pane. On the displayed page, click the Inbound Rules tab and add a rule to allow access from the DLI queue.
For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.
- Create an enhanced datasource connection to Kafka.
- Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
- In the displayed dialog box, set the following parameters: For details, see the following section:
- Connection Name: Name of the enhanced datasource connection For this example, enter dli_kafka.
- Resource Pool: Select the elastic resource pool created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool.
- VPC: Select the VPC of the Kafka instance.
- Subnet: Select the subnet of Kafka instance.
- Set other parameters as you need.
Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.
- Choose Resources > Queue Management and locate the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool. In the Operation column, click More and select Test Address Connectivity.
- In the displayed dialog box, enter Kafka instance address (private network):port in the Address box and click Test to check whether the instance is reachable. Note that multiple addresses must be tested separately.
Step 6: Create an Enhanced Datasource Connection Between DLI and RDS
- Create an RDS security group rule to allow access from CIDR block of the DLI queue.
If the RDS DB instance and Kafka instance are in the same security group of the same VPC, skip this step. Access from the DLI queue has been allowed in 1.
- Go to the RDS console, click the name of the target RDS for MySQL DB instance on the Instances page. Basic information of the instance is displayed.
- In the Connection Information pane, obtain the floating IP address, database port, VPC, and subnet.
- Click the security group name. On the displayed page, click the Inbound Rules tab and add a rule to allow access from the DLI queue. For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.
- Create an enhanced datasource connection to RDS.
If the RDS DB instance and Kafka instance are in the same VPC and subnet, skip this step. The enhanced datasource connection created in 2 has connected the subnet.
If the two instances are in different VPCs or subnets, perform the following steps to create an enhanced datasource connection:- Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
- In the displayed dialog box, set the following parameters: For details, see the following section:
- Connection Name: Name of the enhanced datasource connection For this example, enter dli_rds.
- Resource Pool: Select the name of the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool.
- VPC: Select the VPC of the RDS DB instance.
- Subnet: Select the subnet of RDS DB instance.
- Set other parameters as you need.
Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.
- Choose Resources > Queue Management and locate the queue created in Step 4: Create an Elastic Resource Pool and Add Queues to the Pool. In the Operation column, click More and select Test Address Connectivity.
- In the displayed dialog box, enter Floating IP address:Database port of the RDS for MySQL DB instance in the Address box and click Test to check if the instance is reachable.
Step 7: Use DEW to Manage Access Credentials and Create an Agency That Allows DLI to Access DEW
In cross-source analysis scenarios, you need to set attributes such as the username and password in the connector. However, information such as usernames and passwords is highly sensitive and needs to be encrypted to ensure user data privacy. Flink 1.15 allows for the use of DEW to manage credentials. Before running a job, create a custom agency and configure agency information within the job.
Data Encryption Workshop (DEW) and Cloud Secret Management Service (CSMS) joint form a secure, reliable, and easy-to-use privacy data encryption and decryption solution. This example describes how a Flink OpenSource SQL job uses DEW to manage RDS access credentials.
- Create an agency for DLI to access DEW and complete authorization.
- Create a shared secret in DEW.
- Log in to the DEW management console.
- In the navigation pane on the left, choose Cloud Secret Management Service > Secrets.
- Click Create Secret. On the displayed page, configure basic secret information.
- Secret Name: Enter a secret name. In this example, the name is secretInfo.
- Secret Value: Enter the username and password for logging in to the RDS for MySQL DB instance.
- The key in the first line is MySQLUsername, and the value is the username for logging in to the DB instance.
- The key in the second line is MySQLPassword, and the value is the password for logging in to the DB instance.
Figure 3 Secret Value
- Set other parameters as required and click OK.
Step 8: Create a Flink OpenSource SQL Job
After the source and sink streams are prepared, you can create a Flink OpenSource SQL job.
- In the left navigation pane of the DLI management console, choose Flink Jobs page is displayed. > . The
- In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
- Type: Flink OpenSource SQL
- Name: JobSample
- Description: Leave it blank.
- Template Name: Do not select any template.
- Tags: Leave it blank.
- Click OK to enter the editing page.
- Set job running parameters. The mandatory parameters are as follows:
- Queue: dli_queue_01
- Flink Version: Select 1.12.
- Save Job Log: Enable this function.
- OBS Bucket: Select an OBS bucket for storing job logs and grant access permissions of the OBS bucket as prompted.
- Enable Checkpointing: Enable this function.
You do not need to set other parameters.
- Click Save.
- Edit the Flink OpenSource SQL job.
In the SQL statement editing area, enter query and analysis statements as you need. The example statements are as follows. Note that the values of the parameters in bold must be changed according to the comments.
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'testkafkatopic',// Topic to be written to Kafka. Log in to the Kafka console, click the name of the created Kafka instance, and view the topic name on the Topic Management page. 'properties.bootstrap.servers' = "192.168.0.237:9092,192.168.0.252:9092,192.168.0.137:9092", // Replace it with the internal network address and port number of Kafka. 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE jdbcSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'jdbc', 'url' = "jdbc:mysql://172.16.0.116:3306/rds-dliflink", // testrdsdb indicates the name of the created RDS database. Replace the IP address and port number with those of the RDS for MySQL instance. 'table-name' = 'orders', 'pwd_auth_name'="xxxxx", // Name of the datasource authentication of the password type created on DLI. If datasource authentication is used, you do not need to set the username and password for the job. 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select * from kafkaSource;
- Click Check Semantics.
- Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.
After the job is started, the system automatically switches to the
page, and the created job is displayed in the job list. You can view the job status in the column. After a job is successfully submitted, Status of the job will change from to .If Status of a job is
or , the job fails to be submitted or fails to run. In this case, you can hover over the status icon to view the error details. You can click to copy these details. Rectify the fault based on the error information and resubmit the job. - Connect to the Kafka cluster and send the following test data to the Kafka topics:
For details about how Kafka creates and retrieves data, visit Connecting to an Instance Without SASL.
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- Run the following SQL statement in the MySQL database to view data in the table:
select * from orders;
The following is an example of the execution result copied from the MySQL database:202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106 202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot