文档首页/ 数据湖探索 DLI/ 最佳实践/ 数据分析/ 使用DLI Flink SQL进行电商实时业务数据分析
更新时间:2024-07-23 GMT+08:00

使用DLI Flink SQL进行电商实时业务数据分析

应用场景

当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。而电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。而如何高效快捷地统计这些指标呢?

假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。而我们需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。

方案架构

使用DLI Flink完成电商业务实时数据的分析处理,获取各个渠道的销售汇总数据。

图1 方案简介

流程指导

使用DLI Flink进行电商实时业务数据分析的操作过程主要包括以下步骤:

步骤1:创建资源。在您的账户下创建作业需要的相关资源,涉及VPC、DMS、DLI、RDS。

步骤2:获取DMS连接地址并创建Topic。获取DMS Kafka实例连接地址并创建DMS Topic。

步骤3:创建RDS数据库表。获取RDS实例内网地址,登录RDS实例创建RDS数据库及MySQL表。

步骤4:创建DLI增强型跨源。创建DLI增强型跨源,并测试队列与RDS、DMS实例连通性。

步骤5:创建并提交Flink作业。创建DLI Flink OpenSource SQL作业并运行。

步骤6:查询结果。查询Flink作业结果,使用DLV进行大屏展示。

方案优势

  • 跨源分析:数据免搬迁,就可以关联分析存在OBS中的各个渠道的销售汇总数据。
  • 纯SQL操作:DLI已对接多个数据源,直接通过SQL建表就可以完成数据源的映射。

资源和成本规划

表1 资源和成本规划

资源

资源说明

成本说明

OBS

需要创建一个OBS桶将数据上传到对象存储服务OBS,为后面使用DLI完成数据分析做准备。

OBS的使用涉及以下几项费用:

  • 存储费用:静态网站文件存储在OBS中产生的存储费用
  • 请求费用:用户访问OBS中存储的静态网站文件时产生的请求费用
  • 流量费用:用户使用自定义域名通过公网访问OBS时产生的流量费用

实际产生的费用与存储的文件大小、用户访问所产生的请求次数和流量大小有关,请根据自己的业务进行预估。

DLI

在创建SQL作业前需购买队列,使用DLI的队列资源时,按照队列CU时进行计费。

如购买按需计费的队列,在使用队列资源时,按照队列CU时进行计费。

以小时为单位进行结算。不足一小时按一小时计费,小时数按整点计算。队列CU时按需计费的计算费用=单价*CU数*小时数。

VPC

VPC丰富的功能帮助您灵活管理云上网络,包括创建子网、设置安全组和网络ACL、管理路由表、申请弹性公网IP和带宽等。

VPC本身不收取费用。

但如有互联网访问需求,您需要购买弹性公网IP。弹性公网IP提供“包年/包月”和“按需计费”两种计费模式。

了解VPC计费说明

DMS Kafka

Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。

Kafka版支持按需和包周期两种付费模式。Kafka计费项包括Kafka实例和Kafka的磁盘存储空间。

了解Kafka计费说明

RDS MySQL

数据库 RDS for MySQL提供在线云数据库服务。

RDS对您选择的数据库实例、数据库存储和备份存储(可选)收费。

了解RDS计费说明

DLV

DLV适配云上云下多种数据源,提供丰富多样的可视化组件,快速定制数据大屏。

使用DLV服务的费用主要是DLV包年包月套餐的费用,您可以根据实际使用情况,选择合适的版本规格。

数据说明

  • 数据源表:电商业务订单详情宽表

    字段名

    字段类型

    说明

    order_id

    string

    订单ID

    order_channel

    string

    订单生成的渠道(即web方式、app方式等)

    order_time

    string

    订单时间

    pay_amount

    double

    订单金额

    real_pay

    double

    实际支付金额

    pay_time

    string

    支付时间

    user_id

    string

    用户ID

    user_name

    string

    用户姓名

    area_id

    string

    订单地区ID

  • 结果表:各渠道的销售总额实时统计表。

    字段名

    字段类型

    说明

    begin_time

    varchar(32)

    开始统计指标的时间

    channel_code

    varchar(32)

    渠道编号

    channel_name

    varchar(32)

    渠道名

    cur_gmv

    double

    当天GMV

    cur_order_user_count

    bigint

    当天付款人数

    cur_order_count

    bigint

    当天付款订单数

    last_pay_time

    varchar(32)

    最近结算时间

    flink_current_time

    varchar(32)

    Flink数据处理时间

步骤1:创建资源

表2所示,完成VPC、DMS、RDS、DLI、DLV资源的创建。

表2 创建资源

资源类型

说明

操作指导

VPC

VPC为资源提供云上的网络管理服务。

资源网络规划说明:

  • Kafka与MySQL实例指定的VPC需为同一VPC。
  • Kafka与MySQL实例所属VPC网段不得与创建的DLI队列网段冲突。

创建VPC和子网

DMS Kafka

本例中以DMS Kafka实例作为数据源。

DMS Kafka入门指引

RDS MySQL

本例中以使用RDS提供在线云数据库服务。

RDS MySQL快速入门

DLI

DLI提供实时业务数据分析。

创建DLI队列时请创建“包年包月”或者“按需-专属资源”模式的通用队列,否则无法创建增强型网络连接。

DLI 创建队列

DLV

DLV实时展现DLI队列处理后的结果数据。

DLV 创建大屏

步骤2:获取DMS连接地址并创建Topic

  1. 在控制台单击“服务列表”,选择“分布式消息服务DMS”,单击进入DMS服务控制台页面。在“Kafka专享版”页面找到您所创建的Kafka实例。
    图2 Kafka实例
  2. 进入实例详情页面。单击“基本信息”,获取“连接地址”。
    图3 获取连接地址
  3. 单击“Topic管理”,创建一个Topic:trade_order_detail_info。
    图4 创建Topic

    Topic配置如下:

    • 分区数:1
    • 副本数:1
    • 老化时间:72h
    • 同步落盘:否

步骤3:创建RDS数据库表

  1. 在控制台单击“服务列表”,选择“云数据库RDS”,单击进入RDS页面。在“实例管理页面”,找到您已经创建的RDS实例,获取其内网地址。
    图5 内网地址
  2. 单击所创建RDS实例的“登录”,跳转至“数据管理服务-DAS”。输入相关账户信息,单击“测试连接”。显示连接成功后,单击“登录”,进入“实例登录”页面。
    图6 实例登录
  3. 登录RDS实例后,单击“新建数据库”,创建名称为“dli-demo”的数据库。
    图7 创建数据库
  4. 单击“SQL操作”>“SQL查询”,执行如下SQL创建测试用MySQL表,表相关字段含义在•数据说明中有详细介绍。
    DROP TABLE `dli-demo`.`trade_channel_collect`;
    CREATE TABLE `dli-demo`.`trade_channel_collect` (
    	`begin_time` VARCHAR(32) NOT NULL,
    	`channel_code` VARCHAR(32) NOT NULL,
    	`channel_name` VARCHAR(32) NULL,
    	`cur_gmv` DOUBLE UNSIGNED NULL,
    	`cur_order_user_count` BIGINT UNSIGNED NULL,
    	`cur_order_count` BIGINT UNSIGNED NULL,
    	`last_pay_time` VARCHAR(32) NULL,
    	`flink_current_time` VARCHAR(32) NULL,
    	PRIMARY KEY (`begin_time`, `channel_code`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci
    	COMMENT = '各渠道的销售总额实时统计';
    图8 创建表

步骤4:创建DLI增强型跨源

  1. 在控制台单击“服务列表”,选择“数据湖探索”,单击进入DLI服务页面。单击“资源管理 > 队列管理”,查询创建的DLI队列。
    图9 队列列表
  2. 单击“全局配置 > 服务授权”,选中“VPC Administrator”,单击“更新委托权限”,赋予DLI操作用户VPC资源的权限,用于创建VPC的“对等连接”。
    图10 更新委托权限
  3. 单击“跨源连接 > 增强型跨源 > 创建”,配置如下连接信息后单击“确定”。
    • 连接名称:增强型跨源名称。
    • 弹性资源池:选择您所创建的通用队列。
    • 虚拟私有云:选择 Kafka 与 MySQL 实例所在的VPC。
    • 子网:选择 Kafka 与 MySQL 实例所在的子网。
    图11 创建增强型跨源

    增强型跨源创建完成后,在跨源列表中,对应的跨源连接状态会显示为“已激活”。

    单击跨源连接的名称,详情页面显示连接状态为“ACTIVE”。

    图12 跨源连接状态
    图13 详情
  4. 测试队列与RDS、DMS实例连通性。
    1. 单击“队列管理”,选择您所使用的队列,单击“操作”列中的“更多”>“测试地址连通性”。
      图14 检测地址连通性
    2. 输入DMS Kafka实例连接地址和步RDS MySQL实例内网地址,进行网络连通性测试。

      测试结果显示可达,则DLI队列与Kafka、MySQL实例的网络已经联通。

      图15 测试结果

      如果测试结果不可达,需要修改实例所在VPC的安全组规则,放开9092、3306端口对DLI队列的限制,DLI队列网段信息可以在队列的详情页中获取。

      图16 安全组规则

步骤5:创建并提交Flink作业

  1. 单击DLI控制台左侧“作业管理”,选择“Flink作业”。单击“创建作业”。
    • 类型:选择作业类型为:Flink OpenSource SQL。
    • 名称:自定义。
    图17 创建Flink作业
  2. 单击“确定”,进入作业编辑作业页面,具体SQL示例如下,部分参数值需要根据RDS和DMS对应的信息进行修改。
    --********************************************************************--
    -- 数据源:trade_order_detail_info (订单详情宽表)
    --********************************************************************--
    create table trade_order_detail (
      order_id string,      -- 订单ID
      order_channel string,   -- 渠道
      order_time string,     -- 订单创建时间
      pay_amount double,     -- 订单金额
      real_pay double,      -- 实际付费金额
      pay_time string,      -- 付费时间
      user_id string,      -- 用户ID
      user_name string,     -- 用户名
      area_id string       -- 地区ID
    ) with (
      "connector.type" = "kafka",
      "connector.version" = "0.10",
      "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
      "connector.properties.group.id" = "trade_order",   -- Kafka groupID
      "connector.topic" = "trade_order_detail_info",     -- Kafka topic
      "format.type" = "json",
      "connector.startup-mode" = "latest-offset"
    );
    
    --********************************************************************--
    -- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
    --********************************************************************--
    create table trade_channel_collect(
      begin_time string,       --统计数据的开始时间
      channel_code string,      -- 渠道编号
      channel_name string,      -- 渠道名
      cur_gmv double,         -- 当天GMV
      cur_order_user_count bigint, -- 当天付款人数
      cur_order_count bigint,    -- 当天付款订单数
      last_pay_time string,     -- 最近结算时间
      flink_current_time string,
      primary key (begin_time, channel_code) not enforced
    ) with (
      "connector.type" = "jdbc",
      "connector.url" = "jdbc:mysql://xxxx:3306/xxxx",    -- mysql连接地址,jdbc格式
      "connector.table" = "xxxx",            -- mysql表名
      "connector.driver" = "com.mysql.jdbc.Driver",
      'pwd_auth_name'= 'xxxxx', --DLI侧创建的Password类型的跨源认证名称。使用跨源认证则无需在作业中配置账号和密码。
      "connector.write.flush.max-rows" = "1000",
      "connector.write.flush.interval" = "1s"
    );
    
    --********************************************************************--
    -- 临时中间表
    --********************************************************************--
    create view tmp_order_detail
    as
    select *
        , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
               else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
        , case when t.order_channel = "webShop" then _UTF16"网页商城"
               when t.order_channel = "appShop" then _UTF16"app商城"
               when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
               else _UTF16"其他" end as channel_name --渠道名称
    from (
        select *
            , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
        from trade_order_detail
        where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
        and real_pay is not null
    ) t
    where t.rn = 1;
    
    -- 按渠道统计各个指标
    insert into trade_channel_collect
    select
          begin_time  --统计数据的开始时间
        , channel_code
        , channel_name
        , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
        , count(distinct user_id) as cur_order_user_count --当天付款人数
        , count(1) as cur_order_count --当天付款订单数
        , max(pay_time) as last_pay_time --最近结算时间
    	, cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
    from tmp_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
    group by begin_time, channel_code, channel_name;

    作业逻辑说明如下:

    1. 创建一个Kafka源表,用来从Kafka指定Topic中读取消费数据;
    2. 创建一个结果表,用来通过JDBC向MySQL中写入结果数据。
    3. 实现相应的处理逻辑,以实现各个指标的统计。

      为了简化最终的处理逻辑,使用创建视图进行数据预处理。

      1. 利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string))。
      2. 根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
    4. 根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。
  3. 选择所创建的DLI通用队列提交作业。
    图18 Flink Opensource SQL作业
  4. 等待作业状态会变为“运行中”,单击作业名称,可以查看作业详细运行情况。
    图19 作业运行状态
  5. 使用Kafka客户端向指定topic发送数据,模拟实时数据流。

    具体方法请参考DMS-连接实例生产消费信息

    图20 模拟实时数据流
  1. 发送命令如下:
    sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list KafKa连接地址 --topic Topic名称

    示例数据如下:

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
    {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
    {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
  2. 单击DLI控制台左侧“作业管理”>“Flink作业”,单击3提交的Flink作业。在作业详情页面,可以看到处理的数据记录数。
    图21 Flink作业详情

步骤6:查询结果

  1. 参考2,登录MySQL实例,执行如下SQL语句,即可查询到经过Flink作业处理后的结果数据。
    SELECT * FROM `dli-demo`.`trade_channel_collect`;
    图22 查询结果
  2. 配置DLV大屏,执行SQL查询RDS MySQL,即可以实现大屏实时展示。

    具体配置方法可参考DLV开发大屏

    图23 大屏展示