使用DLI Flink SQL进行电商实时业务数据分析
业务场景介绍
- 场景描述
当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。而电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。而如何高效快捷地统计这些指标呢?
假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。而我们需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。
- 场景方案
- 场景任务
- 数据说明
- 数据源表:电商业务订单详情宽表
字段名
字段类型
说明
order_id
string
订单ID
order_channel
string
订单生成的渠道(即web方式、app方式等)
order_time
string
订单时间
pay_amount
double
订单金额
real_pay
double
实际支付金额
pay_time
string
支付时间
user_id
string
用户ID
user_name
string
用户姓名
area_id
string
订单地区ID
- 结果表:各渠道的销售总额实时统计表。
字段名
字段类型
说明
begin_time
varchar(32)
开始统计指标的时间
channel_code
varchar(32)
渠道编号
channel_name
varchar(32)
渠道名
cur_gmv
double
当天GMV
cur_order_user_count
bigint
当天付款人数
cur_order_count
bigint
当天付款订单数
last_pay_time
varchar(32)
最近结算时间
flink_current_time
varchar(32)
Flink数据处理时间
- 数据源表:电商业务订单详情宽表
流程介绍
使用DLI Flink进行电商实时业务数据分析的操作过程主要包括7个步骤:
步骤1:注册账号。使用DLI对数据进行分析之前,需要注册华为云账号并进行实名认证。
步骤2:创建资源。在您的账户下创建作业需要的相关资源,涉及VPC、DMS、DLI、RDS。
步骤3:获取DMS连接地址并创建Topic。获取DMS Kafka实例连接地址并创建DMS Topic。
步骤4:创建RDS数据库表。获取RDS实例内网地址,登录RDS实例创建RDS数据库及MySQL表。
步骤5:创建DLI增强型跨源。创建DLI增强型跨源,并测试队列与RDS、DMS实例连通性。
步骤6:创建并提交Flink作业。创建DLI Flink OpenSource SQL作业并运行。
步骤7:查询结果。查询Flink作业结果,使用DLV进行大屏展示。
步骤2:创建资源
- 创建VPC,具体步骤可参考:创建VPC和子网。
- 创建DMS Kafka实例,具体步骤可参考:DMS Kafka入门指引。
- 创建RDS MySQL实例,具体步骤可参考:RDS MySQL快速入门。
- 创建DLI CCE队列,具体步骤可参考:DLI 创建队列。
- 创建DLV大屏,具体步骤可参考:DLV 创建大屏。

创建资源时请注意以下几点:
1. Kafka与MySQL实例创建时需指定VPC,该VPC需提前创建好,且网段不与后续创建的DLI队列网段冲突。Kafka与MySQL实例指定的VPC需为同一VPC。
2. 创建DLI CCE队列前需在官网提工单申请开通CCE队列使用权限后,再创建DLI队列。DLI Flink Opensource语法目前仅支持容器化队列(目前仍在封闭测试阶段),如果没有申请CCE队列使用权限,则创建的队列就是普通队列,并且后续创建Flink OpenSource SQL类型作业时无法选择该队列。
3. 请创建DLI队列时请创建“包年包月”或者“按需-专属资源”模式的通用队列。
步骤3:获取DMS连接地址并创建Topic
- 在控制台单击“服务列表”,选择“分布式消息服务DMS”,单击进入DMS服务控制台页面。在“Kafka专享版”页面找到您所创建的Kafka实例。图1 Kafka实例
- 进入实例详情页面。单击“基本信息”,获取“连接地址”。图2 获取连接地址
- 单击“Topic管理”,创建一个Topic:trade_order_detail_info。图3 创建Topic
Topic配置如下:
- 分区数:1
- 副本数:1
- 老化时间:72h
- 同步落盘:否
步骤4:创建RDS数据库表
- 在控制台单击“服务列表”,选择“云数据库RDS”,单击进入RDS页面。在“实例管理页面”,找到您已经创建的RDS实例,获取其内网地址。图4 内网地址
- 单击所创建RDS实例的“登录”,跳转至“数据管理服务-DAS”。输入相关账户信息,单击“测试连接”。显示连接成功后,单击“登录”,进入“实例登录”页面。图5 登录RDS图6 实例登录
- 登录RDS实例后,单击“新建数据库”,创建名称为“dli-demo”的数据库。图7 创建数据库
- 单击“SQL操作”>“SQL查询”,执行如下SQL创建测试用MySQL表,表相关字段含义在•数据说明中有详细介绍。
DROP TABLE `dli-demo`.`trade_channel_collect`; CREATE TABLE `dli-demo`.`trade_channel_collect` ( `begin_time` VARCHAR(32) NOT NULL, `channel_code` VARCHAR(32) NOT NULL, `channel_name` VARCHAR(32) NULL, `cur_gmv` DOUBLE UNSIGNED NULL, `cur_order_user_count` BIGINT UNSIGNED NULL, `cur_order_count` BIGINT UNSIGNED NULL, `last_pay_time` VARCHAR(32) NULL, `flink_current_time` VARCHAR(32) NULL, PRIMARY KEY (`begin_time`, `channel_code`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '各渠道的销售总额实时统计';
图8 创建表
步骤5:创建DLI增强型跨源
- 在控制台单击“服务列表”,选择“数据湖探索”,单击进入DLI服务页面。单击“队列管理”,在队列列表中查询4中已创建的CCE通用队列。图9 队列列表
- 单击“全局配置”>“服务授权”,选中“VPC Administrator”,单击“更新委托权限”,赋予DLI操作用户VPC资源的权限,用于创建VPC的“对等连接”。图10 更新委托权限
- 单击“跨源连接”>“增强型跨源”>“创建”。
- 绑定队列:选择您所创建的通用队列。
- 虚拟私有云:选择 Kafka 与 MySQL 实例所在的VPC。
- 子网:选择 Kafka 与 MySQL 实例所在的子网。
单击“确定”。
图11 创建增强型跨源增强型跨源创建完成后,在跨源列表中,对应的跨源连接状态会显示为“已激活”。
单击跨源连接的名称,详情页面显示连接状态为“ACTIVE”。
图12 跨源连接状态图13 详情 - 测试队列与RDS、DMS实例连通性。
步骤6:创建并提交Flink作业
- 单击DLI控制台左侧“作业管理”,选择“Flink作业”。单击“创建作业”。
- 类型:选择作业类型为:Flink OpenSource SQL。
- 名称:自定义。
图17 创建Flink作业 - 单击“确定”,进入作业编辑作业页面,具体SQL示例如下,部分参数值需要根据RDS和DMS对应的信息进行修改。
--********************************************************************-- -- 数据源:trade_order_detail_info (订单详情宽表) --********************************************************************-- create table trade_order_detail ( order_id string, -- 订单ID order_channel string, -- 渠道 order_time string, -- 订单创建时间 pay_amount double, -- 订单金额 real_pay double, -- 实际付费金额 pay_time string, -- 付费时间 user_id string, -- 用户ID user_name string, -- 用户名 area_id string -- 地区ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址 "connector.properties.group.id" = "trade_order", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- 结果表:trade_channel_collect (各渠道的销售总额实时统计) --********************************************************************-- create table trade_channel_collect( begin_time string, --统计数据的开始时间 channel_code string, -- 渠道编号 channel_name string, -- 渠道名 cur_gmv double, -- 当天GMV cur_order_user_count bigint, -- 当天付款人数 cur_order_count bigint, -- 当天付款订单数 last_pay_time string, -- 最近结算时间 flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql连接地址,jdbc格式 "connector.table" = "xxxx", -- mysql表名 "connector.driver" = "com.mysql.jdbc.Driver", "connector.username" = "xxx", -- mysql用户名 "connector.password" = "xxxx", -- mysql密码 "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- 临时中间表 --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other] , case when t.order_channel = "webShop" then _UTF16"网页商城" when t.order_channel = "appShop" then _UTF16"app商城" when t.order_channel = "miniAppShop" then _UTF16"小程序商城" else _UTF16"其他" end as channel_name --渠道名称 from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据 , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string) and real_pay is not null ) t where t.rn = 1; -- 按渠道统计各个指标 insert into trade_channel_collect select begin_time --统计数据的开始时间 , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV , count(distinct user_id) as cur_order_user_count --当天付款人数 , count(1) as cur_order_count --当天付款订单数 , max(pay_time) as last_pay_time --最近结算时间 , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间 from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;
作业逻辑说明如下:
- 创建一个Kafka源表,用来从Kafka指定Topic中读取消费数据;
- 创建一个结果表,用来通过JDBC向MySQL中写入结果数据。
- 实现相应的处理逻辑,以实现各个指标的统计。
- 利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string))。
- 根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
- 根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。
- 选择所创建的DLI通用队列提交作业。
图18 Flink Opensource SQL作业
- 等待作业状态会变为“运行中”,单击作业名称,可以查看作业详细运行情况。图19 作业运行状态
- 使用Kafka客户端向指定topic发送数据,模拟实时数据流。具体方法请参考:DMS - 连接实例生产消费信息。图20 模拟实时数据流
发送命令如下:
sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka连接地址 --topic 指定topic
示例数据如下:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
- 单击DLI控制台左侧“作业管理”>“Flink作业”,单击3提交的Flink作业。在作业详情页面,可以看到处理的数据记录数。图21 Flink作业详情
