Using DLI Flink SQL to Analyze e-Commerce Business Data in Real Time
Application Scenarios
Online shopping is very popular for its convenience and flexibility. e-Commerce platform can be accessed via an array of methods, such as visiting the websites, using shopping apps, and accessing through mini-programs. A large volume of statistics data such as the real-time access volume, number of orders, and number of visitors needs to be collected and analyzed on each e-commerce platform every day. These data needs to be displayed in an intuitive way and updated in real time to help managers learn about data changes in a timely manner and adjust marketing policies accordingly. How can we efficiently and quickly collect statistics based on these metrics?
Assume the order information of each offering is written into Kafka in real time. The information includes the order ID, channel (websites or apps), order creation time, amount, actual payment amount after discount, payment time, user ID, username, and region ID. We need to collect statistics on such information based on metrics of each sales channel in real time, store the statistics in a database, and display the statistics on screens.
Solution Overview
The following figure gives you an overview to user DLI Flink to analyze and process real-time e-commerce business data and sales data of all channels.
Process
To analyze real-time e-commerce data with DLI Flink, perform the following steps:
Step 1: Creating Resources. Create resources required for creating jobs belong to your account, including VPC, DMS, DLI, and RDS.
Step 2: Obtaining the DMS Connection Address and Creating a Topic. Obtain the connection address of the DMS Kafka instance and create a DMS topic.
Step 3: Creating an RDS Database Table. Obtain the private IP address of the RDS DB instance and log in to the instance to create an RDS database and MySQL table.
Step 4: Creating an Enhanced Datasource Connection. Create an enhanced datasource connection for the queue and test the connectivity between the queue and the RDS instance and the queue and the DMS instance, respectively.
Step 5: Creating and Submitting a Flink Job. Create a DLI Flink OpenSource SQL job and run it.
Step 6: Querying the Result. Query the Flink job results and display the results on a screen in DLV.
Solution Advantages
- Cross-source analysis: You can perform association analysis on sales summary data of each channel stored in OBS. There is no need for data migration.
- SQL only: DLI has interconnected with multiple data sources. You can create tables using SQL statements to complete data source mapping.
Resource Planning and Costs
Resource |
Description |
Cost |
---|---|---|
OBS |
You need to create an OBS bucket and upload data to OBS for data analysis using DLI. |
You will be charged for using the following OBS resources:
The actual fee depends on the size of the stored file, the number of user access requests, and the traffic volume. Estimate the fee based on your service requirements. |
DLI |
Before creating a SQL job, you need to purchase a queue. When using queue resources, you are billed based on the CUH of the queue. |
For example, if you purchase a pay-per-use queue, you will be billed based on the number of CUHs used by the queue. Usage is billed by the hour. For example, 58 minutes of usage will be rounded to the hour. CUH pay-per-use billing = Unit price x Number of CUs x Number of hours. |
VPC |
You can customize subnets, security groups, network ACLs, and assign EIPs and bandwidths. |
The VPC service is free of charge. EIPs are required if your resources need to access the Internet. EIP supports two billing modes: pay-per-use and yearly/monthly. For details, see VPC Billing. |
DMS Kafka |
Kafka provides premium instances with computing, storage, and exclusive bandwidth resources. |
Kafka supports two billing modes: pay-per-use and yearly/monthly. Billing items include Kafka instances and Kafka disk storage space. For details, see DMS for Kafka Billing. |
RDS MySQL |
RDS for MySQL provides online cloud database services. |
You are billed for RDS DB instances, database storage, and backup storage (optional). For details, see RDS Billing. |
DLV |
DLV adapts to a wide range of on-premise and cloud data sources, and provides diverse visualized components for you to quickly customize your data screens. |
If you use the DLV service, you will be charged for the purchased yearly/monthly DLV package. |
Example Data
- Order details wide table
Field
Data Type
Description
order_id
string
Order ID.
order_channel
string
Order channel (websites or apps)
order_time
string
Time
pay_amount
double
Order amount
real_pay
double
Actual amount paid
pay_time
string
Payment time
user_id
string
User ID
user_name
string
Username
area_id
string
Region ID
- Result table: real-time statistics of the total sales amount in each channel
Field
Data Type
Description
begin_time
varchar(32)
Start time for collecting statistics on metrics
channel_code
varchar(32)
Channel code
channel_name
varchar(32)
Channel
cur_gmv
double
Gross merchandises value (GMV) of the day
cur_order_user_count
bigint
Number of users who settled the payment in the day
cur_order_count
bigint
Number of orders paid on the day
last_pay_time
varchar(32)
Latest settlement time
flink_current_time
varchar(32)
Flink data processing time
Step 1: Creating Resources
Create VPC, DMS, RDS, DLI, and DLV resources listed in Table 2.
Resource |
Description |
Instructions |
---|---|---|
VPC |
A VPC manages network resources on the cloud. The network planning is described as follows:
|
|
DMS Kafka |
In this example, the DMS for Kafka instance is the data source. |
|
RDS MySQL |
In this example, an RDS for MySQL instance provides the cloud database service. |
|
DLI |
DLI provides real-time data analysis. Create a general-purpose queue that uses dedicated resources in yearly/monthly or pay-per-use billing mode. Otherwise, an enhanced network connection cannot be created. |
|
DLV |
DLV displays the result data processed by the DLI queue in real time. |
Step 2: Obtaining the DMS Connection Address and Creating a Topic
- Hover the mouth on the Service List icon and choose Distributed Message Service in Application. The DMS console is displayed. On the DMS for Kafka page, locate the Kafka instance you have created.
Figure 2 Kafka instances
- The instance details page is displayed. Obtain the Instance Address (Private Network) in the Connection pane.
Figure 3 Connection address
- Create a topic and name it trade_order_detail_info.
Figure 4 Creating a topic
Configure the required topic parameters as follows:
- Partitions: Set it to 1.
- Replicas: Set it to 1.
- Aging Time: Set it to 72 hours.
- Synchronous Flushing: Disable this function.
Step 3: Creating an RDS Database Table
- Log in to the console, hover your mouse over the service list icon and choose Relational Database Service in Databases. The RDS console is displayed. On the Instances page, locate the created DB instance and view its floating IP address.
Figure 5 Viewing the floating IP address
- Click More > Log In in the Operation column. On the displayed page, enter the username and password for logging in to the instance and click Test Connection. After Connection is successful is displayed, click Log In.
Figure 6 Logging in to an Instance
- Click Create Database. In the displayed dialog box, enter database name dli-demo. Then, click OK.
Figure 7 Creating a database
- Choose SQL Operation > SQL Query and run the following SQL statement to create a MySQL table for test (Example Data describes the fields):
DROP TABLE `dli-demo`.`trade_channel_collect`; CREATE TABLE `dli-demo`.`trade_channel_collect` ( `begin_time` VARCHAR(32) NOT NULL, `channel_code` VARCHAR(32) NOT NULL, `channel_name` VARCHAR(32) NULL, `cur_gmv` DOUBLE UNSIGNED NULL, `cur_order_user_count` BIGINT UNSIGNED NULL, `cur_order_count` BIGINT UNSIGNED NULL, `last_pay_time` VARCHAR(32) NULL, `flink_current_time` VARCHAR(32) NULL, PRIMARY KEY (`begin_time`, `channel_code`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'Real-time statistics on the total sales amount of each channel';
Figure 8 Creating a table
Step 4: Creating an Enhanced Datasource Connection
- On the management console, hover the mouse on the service list icon and choose Analytics > Data Lake Insight. The DLI management console is displayed. Choose Resources > Queue Management to query the created DLI queue.
Figure 9 Queue list
- In the navigation pane of the DLI management console, choose Global Configuration > Service Authorization. On the displayed page, select VPC Administrator, and click Update to grant the DLI user the permission to access VPC resources. The permission is used to create a VPC peering connection.
Figure 10 Updating agency permissions
- Choose Datasource Connections. On the displayed Enhanced tab, click Create. Configure the following parameters, and click OK.
- Connection Name: Enter a name.
- Resource Pool: Select the general-purpose queue you have created.
- VPC: Select the VPC where the Kafka and MySQL instances are.
- Subnet: Select the subnet where the Kafka and MySQL instances are.
Figure 11 Creating an enhanced datasource
The status of the created datasource connection is Active in the Enhanced tab.
Click the name of the datasource connection. On the details page, the connection status is ACTIVE.
Figure 12 Connection status
Figure 13 Details
- Test whether the queue can connect to RDS for MySQL and DMS for Kafka instances, respectively.
- On the Queue Management page, locate the target queue. In the Operation column, click More > Test Address Connectivity.
Figure 14 Testing address connectivity
- Enter the connection address of the DMS for Kafka instance and the private IP address of the RDS for MySQL instance to test the connectivity.
If the test is successful, the DLI queue can connect to the Kafka and MySQL instances.
Figure 15 Testing address connectivity
If the test fails, modify the security group rules of the VPC where the Kafka and MySQL instances are to allow DLI queue access on ports 9092 and 3306. You can obtain the network segment of the queue on its details page.
Figure 16 VPC security group rules
- On the Queue Management page, locate the target queue. In the Operation column, click More > Test Address Connectivity.
Step 5: Creating and Submitting a Flink Job
- In the navigation pane on the left, choose Job Management > Flink Jobs. Click Create Job.
- Type: Select Flink OpenSource SQL.
- Name: Enter a name.
Figure 17 Creating a Flink Job
- Click OK. The job editing page is displayed. The following is a simple SQL statement. You need to modify some parameter values based on the RDS and DMS instance information.
--********************************************************************-- -- Data source: trade_order_detail_info (order details wide table) --********************************************************************-- create table trade_order_detail ( order_id string, -- Order ID order_channel string, -- Channel order_time string, -- Order creation time pay_amount double, -- Order amount real_pay double, -- Actual payment amount pay_time string, -- Payment time user_id string, -- User ID user_name string, -- Username area_id string -- Region ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka connection address "connector.properties.group.id" = "trade_order", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- Result table: trade_channel_collect (real-time statistics on the total sales amount of each channel) --********************************************************************-- create table trade_channel_collect( begin_time string, --Start time of statistics collection channel_code string, -- Channel ID channel_name string, -- Channel name cur_gmv double, -- GMV cur_order_user_count bigint, -- Number of payers cur_order_count bigint, -- Number of orders paid on the day last_pay_time string, -- Latest settlement time flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- MySQL connection address, in JDBC format "connector.table" = "xxxx", -- MySQL table name "connector.driver" = "com.mysql.jdbc.Driver", 'pwd_auth_name'= 'xxxxx', -- Name of the datasource authentication of the password type created on DLI. If datasource authentication is used, you do not need to set the username and password for the job. "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- Temporary intermediate table --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code -- Redefine channels. Only four enumeration values are available: webShop, appShop, miniAppShop, and other. , case when t.order_channel = "webShop" then _UTF16"Website" when t.order_channel = "appShop" then _UTF16"Shopping App" when t.order_channel = "miniAppShop" then _UTF16" Miniprogram" else _UTF16"Other" end as channel_name -- Channel name from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn -- Remove duplicate order data , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --Obtain the data of the current day. To accelerate running, 2021-03-25 12:03:00 is used to replace cast(LOCALTIMESTAMP as string). and real_pay is not null ) t where t.rn = 1; -- Collect data statistics by channel. insert into trade_channel_collect select begin_time --Start time of statistics collection , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv -- GMV , count(distinct user_id) as cur_order_user_count -- Number of payers , count(1) as cur_order_count -- Number of orders paid on the day , max(pay_time) as last_pay_time -- Settlement time , cast(LOCALTIMESTAMP as string) as flink_current_time -- Current time of the flink task from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;
Job logic
- Create a Kafka source table to read consumption data from a specified Kafka topic.
- Create a result table to write result data into MySQL through JDBC.
- Implement the processing logic to collect statistics on each metric.
To simplify the final processing logic, create a view to preprocess the data.
- Use over window condition and filters to remove duplicate data (the top N method is used). In addition, the built-in functions concat and substr are used to set 00:00:00 as the start time and 23:59:59 of the same day as the end time, and to collect statistics on orders paid later than 00:00:00 on the day. (To facilitate data simulation, replace cast(LOCALTIMESTAMP as string) with 2021-03-25 12:03:00.)
- Based on the channels of the order data, the built-in condition function is used to set channel_code and channel_name to obtain the field information in the source table and the values of begin_time, end_time, channel_code, and channel_name.
- Collect statistics on the required metrics, filter the data as required, and write the results to the result table.
- Select the created general-purpose queue and submit the job.
- Wait until the job status changes to Running. Click the job name to view the details.
Figure 19 Job status
- Use the Kafka client to send data to a specified topic to simulate real-time data streaming.
For details, see Connecting to an Instance Without SASL.
Figure 20 Real-time data streaming
- Run the following command:
sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list KafKa connection address --topic Topic name
Example data is as follows:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
- In the navigation pane on the left, choose Job Management > Flink Jobs, and click the job submitted in 3. On the job details page, view the number of processed data records.
Figure 21 Job details
Step 6: Querying the Result
- Log in to the MySQL instance by referring to 2 and run the following SQL statement to query the result data processed by the Flink job:
SELECT * FROM `dli-demo`.`trade_channel_collect`;
Figure 22 Querying results
- Log in to the DLV console, configure a DLV screen, and run SQL statements to query data in the RDS for MySQL instance to display the data on the screen.
For details, see Editing Screens.
Figure 23 DLV screen
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot