Help Center> Data Lake Insight> Best Practices> Data Analysis> Using DLI Flink SQL to Analyze e-Commerce Business Data in Real Time
Updated on 2024-05-29 GMT+08:00

Using DLI Flink SQL to Analyze e-Commerce Business Data in Real Time

Application Scenarios

Online shopping is very popular for its convenience and flexibility. e-Commerce platform can be accessed via an array of methods, such as visiting the websites, using shopping apps, and accessing through mini-programs. A large volume of statistics data such as the real-time access volume, number of orders, and number of visitors needs to be collected and analyzed on each e-commerce platform every day. These data needs to be displayed in an intuitive way and updated in real time to help managers learn about data changes in a timely manner and adjust marketing policies accordingly. How can we efficiently and quickly collect statistics based on these metrics?

Assume the order information of each offering is written into Kafka in real time. The information includes the order ID, channel (websites or apps), order creation time, amount, actual payment amount after discount, payment time, user ID, username, and region ID. We need to collect statistics on such information based on metrics of each sales channel in real time, store the statistics in a database, and display the statistics on screens.

Solution Overview

The following figure gives you an overview to user DLI Flink to analyze and process real-time e-commerce business data and sales data of all channels.

Figure 1 Solution overview

Process

To analyze real-time e-commerce data with DLI Flink, perform the following steps:

Step 1: Creating Resources. Create resources required for creating jobs belong to your account, including VPC, DMS, DLI, and RDS.

Step 2: Obtaining the DMS Connection Address and Creating a Topic. Obtain the connection address of the DMS Kafka instance and create a DMS topic.

Step 3: Creating an RDS Database Table. Obtain the private IP address of the RDS DB instance and log in to the instance to create an RDS database and MySQL table.

Step 4: Creating an Enhanced Datasource Connection. Create an enhanced datasource connection for the queue and test the connectivity between the queue and the RDS instance and the queue and the DMS instance, respectively.

Step 5: Creating and Submitting a Flink Job. Create a DLI Flink OpenSource SQL job and run it.

Step 6: Querying the Result. Query the Flink job results and display the results on a screen in DLV.

Solution Advantages

  • Cross-source analysis: You can perform association analysis on sales summary data of each channel stored in OBS. There is no need for data migration.
  • SQL only: DLI has interconnected with multiple data sources. You can create tables using SQL statements to complete data source mapping.

Resource Planning and Costs

Table 1 Resource planning and costs

Resource

Description

Cost

OBS

You need to create an OBS bucket and upload data to OBS for data analysis using DLI.

You will be charged for using the following OBS resources:

  • Storage Fee for storing static website files in OBS.
  • Request Fee for accessing static website files stored in OBS.
  • Traffic Fee for using a custom domain name to access OBS over the public network.

The actual fee depends on the size of the stored file, the number of user access requests, and the traffic volume. Estimate the fee based on your service requirements.

DLI

Before creating a SQL job, you need to purchase a queue. When using queue resources, you are billed based on the CUH of the queue.

For example, if you purchase a pay-per-use queue, you will be billed based on the number of CUHs used by the queue.

Usage is billed by the hour. For example, 58 minutes of usage will be rounded to the hour. CUH pay-per-use billing = Unit price x Number of CUs x Number of hours.

VPC

You can customize subnets, security groups, network ACLs, and assign EIPs and bandwidths.

The VPC service is free of charge.

EIPs are required if your resources need to access the Internet. EIP supports two billing modes: pay-per-use and yearly/monthly.

For more, see VPC Billing Description.

DMS Kafka

Kafka provides premium instances with computing, storage, and exclusive bandwidth resources.

Kafka supports two billing modes: pay-per-use and yearly/monthly. Billing items include Kafka instances and Kafka disk storage space.

For details, see DMS for Kafka Billing Description.

RDS MySQL

RDS for MySQL provides online cloud database services.

You are billed for RDS DB instances, database storage, and backup storage (optional).

For details, see RDS Billing Description.

DLV

DLV adapts to a wide range of on-premise and cloud data sources, and provides diverse visualized components for you to quickly customize your data screens.

If you use the DLV service, you will be charged for the purchased yearly/monthly DLV package.

Example Data

  • Order details wide table

    Field

    Data Type

    Description

    order_id

    string

    Order ID.

    order_channel

    string

    Order channel (websites or apps)

    order_time

    string

    Time

    pay_amount

    double

    Order amount

    real_pay

    double

    Actual amount paid

    pay_time

    string

    Payment time

    user_id

    string

    User ID

    user_name

    string

    Username

    area_id

    string

    Region ID

  • Result table: real-time statistics of the total sales amount in each channel

    Field

    Data Type

    Description

    begin_time

    varchar(32)

    Start time for collecting statistics on metrics

    channel_code

    varchar(32)

    Channel code

    channel_name

    varchar(32)

    Channel

    cur_gmv

    double

    Gross merchandises value (GMV) of the day

    cur_order_user_count

    bigint

    Number of users who settled the payment in the day

    cur_order_count

    bigint

    Number of orders paid on the day

    last_pay_time

    varchar(32)

    Latest settlement time

    flink_current_time

    varchar(32)

    Flink data processing time

Step 1: Creating Resources

Create VPC, DMS, RDS, DLI, and DLV resources listed in Table 2.

Table 2 Cloud resources required

Resource

Description

Instructions

VPC

A VPC manages network resources on the cloud.

The network planning is described as follows:

  • The VPCs specified for the Kafka and MySQL instances must be the same.
  • The VPC network segment where the Kafka and MySQL instances belong cannot conflict with that of the DLI queue.

Creating a VPC and Subnet

DMS Kafka

In this example, the DMS for Kafka instance is the data source.

Getting Started with DMS for Kafka

RDS MySQL

In this example, an RDS for MySQL instance provides the cloud database service.

Getting Started with RDS for MySQL

DLI

DLI provides real-time data analysis.

Create a general-purpose queue that uses dedicated resources in yearly/monthly or pay-per-use billing mode. Otherwise, an enhanced network connection cannot be created.

Creating a Queue

DLV

DLV displays the result data processed by the DLI queue in real time.

Creating Screens

Step 2: Obtaining the DMS Connection Address and Creating a Topic

  1. Hover the mouth on the Service List icon and choose Distributed Message Service in Application. The DMS console is displayed. On the DMS for Kafka page, locate the Kafka instance you have created.
    Figure 2 Kafka instances
  2. The instance details page is displayed. Obtain the Instance Address (Private Network) in the Connection pane.
    Figure 3 Connection address
  3. Create a topic and name it trade_order_detail_info.
    Figure 4 Creating a topic

    Configure the required topic parameters as follows:

    • Partitions: Set it to 1.
    • Replicas: Set it to 1.
    • Aging Time: Set it to 72 hours.
    • Synchronous Flushing: Disable this function.

Step 3: Creating an RDS Database Table

  1. Log in to the console, hover your mouse over the service list icon and choose Relational Database Service in Databases. The RDS console is displayed. On the Instances page, locate the created DB instance and view its floating IP address.
    Figure 5 Viewing the floating IP address
  2. Click More > Log In in the Operation column. On the displayed page, enter the username and password for logging in to the instance and click Test Connection. After Connection is successful is displayed, click Log In.
    Figure 6 Logging in to an Instance
  3. Click Create Database. In the displayed dialog box, enter database name dli-demo. Then, click OK.
    Figure 7 Creating a database
  4. Choose SQL Operation > SQL Query and run the following SQL statement to create a MySQL table for test (Example Data describes the fields):
    DROP TABLE `dli-demo`.`trade_channel_collect`;
    CREATE TABLE `dli-demo`.`trade_channel_collect` (
    	`begin_time` VARCHAR(32) NOT NULL,
    	`channel_code` VARCHAR(32) NOT NULL,
    	`channel_name` VARCHAR(32) NULL,
    	`cur_gmv` DOUBLE UNSIGNED NULL,
    	`cur_order_user_count` BIGINT UNSIGNED NULL,
    	`cur_order_count` BIGINT UNSIGNED NULL,
    	`last_pay_time` VARCHAR(32) NULL,
    	`flink_current_time` VARCHAR(32) NULL,
    	PRIMARY KEY (`begin_time`, `channel_code`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci
    	COMMENT = 'Real-time statistics on the total sales amount of each channel';
    Figure 8 Creating a table

Step 4: Creating an Enhanced Datasource Connection

  1. On the management console, hover the mouse on the service list icon and choose Analytics > Data Lake Insight. The DLI management console is displayed. Choose Resources > Queue Management to query the created DLI queue.
    Figure 9 Queue list
  2. In the navigation pane of the DLI management console, choose Global Configuration > Service Authorization. On the displayed page, select VPC Administrator, and click Update to grant the DLI user the permission to access VPC resources. The permission is used to create a VPC peering connection.
    Figure 10 Updating agency permissions
  3. Choose Datasource Connections. On the displayed Enhanced tab, click Create. Configure the following parameters, and click OK.
    • Connection Name: Enter a name.
    • Resource Pool: Select the general-purpose queue you have created.
    • VPC: Select the VPC where the Kafka and MySQL instances are.
    • Subnet: Select the subnet where the Kafka and MySQL instances are.
    Figure 11 Creating an enhanced datasource

    The status of the created datasource connection is Active in the Enhanced tab.

    Click the name of the datasource connection. On the details page, the connection status is ACTIVE.

    Figure 12 Connection status
    Figure 13 Details
  4. Test whether the queue can connect to RDS for MySQL and DMS for Kafka instances, respectively.
    1. On the Queue Management page, locate the target queue. In the Operation column, click More > Test Address Connectivity.
      Figure 14 Testing address connectivity
    2. Enter the connection address of the DMS for Kafka instance and the private IP address of the RDS for MySQL instance to test the connectivity.

      If the test is successful, the DLI queue can connect to the Kafka and MySQL instances.

      Figure 15 Testing address connectivity

      If the test fails, modify the security group rules of the VPC where the Kafka and MySQL instances are to allow DLI queue access on ports 9092 and 3306. You can obtain the network segment of the queue on its details page.

      Figure 16 VPC security group rules

Step 5: Creating and Submitting a Flink Job

  1. In the navigation pane on the left, choose Job Management > Flink Jobs. Click Create Job.
    • Type: Select Flink OpenSource SQL.
    • Name: Enter a name.
    Figure 17 Creating a Flink Job
  2. Click OK. The job editing page is displayed. The following is a simple SQL statement. You need to modify some parameter values based on the RDS and DMS instance information.
    --********************************************************************--
    -- Data source: trade_order_detail_info (order details wide table)
    --********************************************************************--
    create table trade_order_detail (
      order_id string,      -- Order ID
      order_channel string,   -- Channel
      order_time string,      -- Order creation time
      pay_amount double,     -- Order amount
      real_pay double,     -- Actual payment amount
      pay_time string,      -- Payment time
      user_id string,      -- User ID
      user_name string,      -- Username
      area_id string      -- Region ID
    ) with (
      "connector.type" = "kafka",
      "connector.version" = "0.10",
      "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka connection address
      "connector.properties.group.id" = "trade_order",   -- Kafka groupID
      "connector.topic" = "trade_order_detail_info",     -- Kafka topic
      "format.type" = "json",
      "connector.startup-mode" = "latest-offset"
    );
    
    --********************************************************************--
    -- Result table: trade_channel_collect (real-time statistics on the total sales amount of each channel)
    --********************************************************************--
    create table trade_channel_collect(
      begin_time string,       --Start time of statistics collection
      channel_code string,      -- Channel ID
      channel_name string,      -- Channel name
      cur_gmv double,         -- GMV
      cur_order_user_count bigint, -- Number of payers
      cur_order_count bigint,    -- Number of orders paid on the day
      last_pay_time string,     -- Latest settlement time
      flink_current_time string,
      primary key (begin_time, channel_code) not enforced
    ) with (
      "connector.type" = "jdbc",
      "connector.url" = "jdbc:mysql://xxxx:3306/xxxx",     -- MySQL connection address, in JDBC format
      "connector.table" = "xxxx",            -- MySQL table name
      "connector.driver" = "com.mysql.jdbc.Driver",
      'pwd_auth_name'= 'xxxxx', -- Name of the datasource authentication of the password type created on DLI. If datasource authentication is used, you do not need to set the username and password for the job.
      "connector.write.flush.max-rows" = "1000",
      "connector.write.flush.interval" = "1s"
    );
    
    --********************************************************************--
    -- Temporary intermediate table
    --********************************************************************--
    create view tmp_order_detail
    as
    select *
        , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
               else t.order_channel end as channel_code -- Redefine channels. Only four enumeration values are available: webShop, appShop, miniAppShop, and other.
        , case when t.order_channel = "webShop" then _UTF16"Website"
               when t.order_channel = "appShop" then _UTF16"Shopping App"
               when t.order_channel = "miniAppShop" then _UTF16" Miniprogram"
               else _UTF16"Other" end as channel_name -- Channel name
    from (
        select *
            , row_number() over(partition by order_id order by order_time desc ) as rn -- Remove duplicate order data
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
        from trade_order_detail
        where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --Obtain the data of the current day. To accelerate running, 2021-03-25 12:03:00 is used to replace cast(LOCALTIMESTAMP as string).
        and real_pay is not null
    ) t
    where t.rn = 1;
    
    -- Collect data statistics by channel.
    insert into trade_channel_collect
    select
          begin_time  --Start time of statistics collection
        , channel_code
        , channel_name
        , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv -- GMV
        , count(distinct user_id) as cur_order_user_count -- Number of payers
        , count(1) as cur_order_count -- Number of orders paid on the day
        , max(pay_time) as last_pay_time -- Settlement time
    	, cast(LOCALTIMESTAMP as string) as flink_current_time -- Current time of the flink task
    from tmp_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
    group by begin_time, channel_code, channel_name;

    Job logic

    1. Create a Kafka source table to read consumption data from a specified Kafka topic.
    2. Create a result table to write result data into MySQL through JDBC.
    3. Implement the processing logic to collect statistics on each metric.

      To simplify the final processing logic, create a view to preprocess the data.

      1. Use over window condition and filters to remove duplicate data (the top N method is used). In addition, the built-in functions concat and substr are used to set 00:00:00 as the start time and 23:59:59 of the same day as the end time, and to collect statistics on orders paid later than 00:00:00 on the day. (To facilitate data simulation, replace cast(LOCALTIMESTAMP as string) with 2021-03-25 12:03:00.)
      2. Based on the channels of the order data, the built-in condition function is used to set channel_code and channel_name to obtain the field information in the source table and the values of begin_time, end_time, channel_code, and channel_name.
    4. Collect statistics on the required metrics, filter the data as required, and write the results to the result table.
  3. Select the created general-purpose queue and submit the job.
    Figure 18 Flink OpenSource SQL Job
  4. Wait until the job status changes to Running. Click the job name to view the details.
    Figure 19 Job status
  5. Use the Kafka client to send data to a specified topic to simulate real-time data streaming.

    For details, see Connecting to an Instance Without SASL.

    Figure 20 Real-time data streaming
  1. Run the following command:
    sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list KafKa connection address --topic Topic name

    Example data is as follows:

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
  2. In the navigation pane on the left, choose Job Management > Flink Jobs, and click the job submitted in 3. On the job details page, view the number of processed data records.
    Figure 21 Job details

Step 6: Querying the Result

  1. Log in to the MySQL instance by referring to 2 and run the following SQL statement to query the result data processed by the Flink job:
    SELECT * FROM `dli-demo`.`trade_channel_collect`;
    Figure 22 Querying results
  2. Log in to the DLV console, configure a DLV screen, and run SQL statements to query data in the RDS for MySQL instance to display the data on the screen.

    For details, see Editing Screens.

    Figure 23 DLV screen