更新时间:2023-12-07 GMT+08:00
分享

开始使用

安全组规则修改(可选)

  • Kafka默认使用9092端口,该方案默认针对子网网段及DLI网段开放请参考修改安全组规则,配置IP地址白名单,以便能正常访问服务。
  • 云数据库RDS 默认使用3306端口,该方案默认针对子网网段及DLI网段开放请参考修改安全组规则,配置IP地址白名单,以便能正常访问服务。

安全组实际是网络流量访问策略,包括网络流量入方向规则和出方向规则,通过这些规则为安全组内具有相同保护需求并且相互信任的云服务器、云容器、云数据库等实例提供安全保护。

如果您的实例关联的安全组策略无法满足使用需求,比如需要添加、修改、删除某个TCP端口,请参考以下内容进行修改。

  • 添加安全组规则:根据业务使用需求需要开放某个TCP端口,请参考添加安全组规则添加入方向规则,打开指定的TCP端口。
  • 修改安全组规则:安全组规则设置不当会造成严重的安全隐患。您可以参考修改安全组规则,来修改安全组中不合理的规则,保证云服务器等实例的网络安全。
  • 删除安全组规则:当安全组规则入方向、出方向源地址/目的地址有变化时,或者不需要开放某个端口时,您可以参考删除安全组规则进行安全组规则删除。
  1. 查看OBS桶。在控制台单击“服务列表”,选择“对象存储服务 OBS”,单击进入OBS页面。该桶将作为用户上传电商数据文件的存储桶。

    图1 OBS桶

  2. 查看Kafka。在控制台单击“服务列表”,选择“分布式消息服务Kafka版”,在“Kafka专享版”页面找到该方案所创建的Kafka实例。

    图2 Kafka实例

  3. 创建Kafka消费组。单击“消费组管理>创建消费组”,输入消费组名字“trade_order”(使用demo样例时请填写该名称),单击“确定”保存。

    图3 消费组

  4. 查看RDS。在控制台单击“服务列表”,选择“云数据库RDS”,单击进入RDS页面。在“实例管理页面”,找到该解决方案已经创建的RDS实例。

    图4 云数据库 RDS实例

  5. 查看DLI。在控制台单击“服务列表”,选择“数据湖探索 DLI”,单击进入DLI服务页面。单击“资源管理 > 队列管理”,查询创建的DLI队列。

    图5 DLI队列

  6. 配置DLI服务授权。单击“全局变量>服务授权”,选定以下两项委托授权,单击“更新委托权限”。

    图6 DLI服务授权

  7. 参考数据湖探索 DLI使用指南创建Flink作业,启动成功后作业状态为“运行中”。
  8. 体验Demo样例。获取Demo数据样例文件,上传至3.3开始使用步骤1中的OBS桶中,即可触发函数工作流去主动将数据写入Kafka,并自动在MySQL中创建结果表。手动创建请参考云数据库RDS for MySQL使用指南创建结果存储表,待Flink作业创建成功后参考分布式消息服务 Kafka使用指南连接Kafka手动写入数据(或参考Kafka开发指南-python文档函数工作流FunctionGraph定制化开发修改代码并重新部署,上传业务数据csv文件至OBS桶即可自动写入数据)。
  9. 进入Flink作业,单击“任务列表”,查看任务状态。

    图7 任务列表

  10. 登录数据库,“SQL操作”>“SQL查询”,执行如下SQL语句,即可查询到经过Flink作业处理后的结果数据。

    图8 结果数据表

  11. 如需在可视化数据 DLV大屏展示数据,请参考配置数据可视化 DLV大屏进行配置。配置效果如下:

    图9 DLV大屏展示

云数据库RDS for MySQL使用指南

  1. 进入RDS页面,在“实例管理页面”,找到该解决方案已经创建的RDS实例,获取其内网地址。

    图10 云数据库 RDS实例
    图11 RDS内网地址

  2. 单击所创建RDS实例的“登录”,跳转至“数据管理服务-DAS”。输入相关账户信息,单击“测试连接”。显示连接成功后,单击“登录”,进入“实例登录”页面。登录RDS实例后,单击“新建数据库”,创建名称为“dli-demo”的数据库。

    图12 数据库登录
    图13 新建数据库
    图14 数据库参数填写

  3. 单击“SQL窗口”,根据实际业务场景创建参考如下SQL代码创建表,表相关字段含义在表1 数据源表表2 结果表中有详细介绍。

    图15 切换SQL视图
    图16 执行SQL表创建
    以下SQL代码仅供参考,请根据实际业务场景修改表信息:
    DROP TABLE `dli-demo`.`trade_channel_collect`;
    CREATE TABLE `dli-demo`.`trade_channel_collect` ( 
           `begin_time` VARCHAR(32) NOT NULL, 
           `channel_code` VARCHAR(32) NOT NULL, 
           `channel_name` VARCHAR(32) NULL, 
           `cur_gmv` DOUBLE UNSIGNED NULL, 
           `cur_order_user_count` BIGINT UNSIGNED NULL, 
           `cur_order_count` BIGINT UNSIGNED NULL, `last_pay_time` VARCHAR(32) NULL, 
           `flink_current_time` VARCHAR(32) NULL, 
           PRIMARY KEY (`begin_time`, `channel_code`) 
    )      ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4
           COLLATE = utf8mb4_general_ci 
           COMMENT = '各渠道的销售总额实时统计';

  4. 查询表数据。登录MySQL实例,单击“SQL窗口”执行如下SQL语句,即可查询到经过Flink作业处理后的结果数据。

    SELECT * FROM `dli-demo`.`trade_channel_collect`;
    图17 切换SQL窗口
    图18 查询结果表

分布式消息服务 Kafka使用指南

  1. 在控制台单击“服务列表”,选择“分布式消息服务Kafka版”,在“Kafka专享版”页面找到该方案所创建的Kafka实例。更多介绍请参考Kafka介绍

    图19 Kafka实例链接地址

  2. 进入实例详情页面。单击“基本信息”,获取“连接地址”。

    图20 Kafka实例链接地址

  3. 单击“消费组管理>创建消费组”,输入消费组名字,单击“确定”保存。

    图21 消费组

  4. 单击“Topic管理”,创建一个Topic。

    图22 创建Topic

  5. 使用Kafka客户端向指定topic发送数据,模拟实时数据流。具体方法请参考DMS-连接实例生产消费信息

    图23 模拟数据写入Kafka

  6. 登录Kafka实例,发送命令如下:

    进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息:
    ./kafka-console-producer.sh --broker-list KafKa连接地址 --topic Topic名称

    示例数据如下:

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}

  7. 登录Kafka控制台,单击“消息查询”可以查看消息信息。

    图24 消息列表

数据湖探索 DLI使用指南

  1. 在控制台单击“服务列表”,选择“数据湖探索 DLI”,单击进入DLI服务页面。更多介绍请参考DLI介绍
  2. 单击DLI控制台左侧“作业管理 > Flink作业 > 创建作业”。

    图25 创建作业
    • 类型:选择作业类型为:Flink OpenSource SQL。
    • 名称:自定义

  3. 单击“确定”,进入作业编辑作业页面,具体SQL示例如下,部分参数值需要根据RDS和DMS对应的信息进行修改,更多SQL语法请参考Flink OpenSource SQL1.12语法概览(注意Flink作业日志存储OBS桶建议避免选用本方案创建的OBS桶,否则会导致函数工作流重复调用,该动作不影响方案正常使用)。

    图26 Flink作业示例
    --********************************************************************--
    -- 数据源:trade_order_detail_info (订单详情宽表)
    --********************************************************************--
    create table trade_order_detail (
      order_id string,      -- 订单ID
      order_channel string,   -- 渠道
      order_time string,     -- 订单创建时间
      pay_amount double,     -- 订单金额
      real_pay double,      -- 实际付费金额
      pay_time string,      -- 付费时间
      user_id string,      -- 用户ID
      user_name string,     -- 用户名
      area_id string       -- 地区ID
    ) with (
      "connector" = "kafka",
      "properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
      "properties.group.id" = "xxxx",   -- Kafka groupID
      "topic" = "xxxx",     -- Kafka topic
      "format" = "json",
      "scan.startup.mode" = "latest-offset"
    );
    
    --********************************************************************--
    -- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
    --********************************************************************--
    create table trade_channel_collect(
      begin_time string,       --统计数据的开始时间
      channel_code string,      -- 渠道编号
      channel_name string,      -- 渠道名
      cur_gmv double,         -- 当天GMV
      cur_order_user_count bigint, -- 当天付款人数
      cur_order_count bigint,    -- 当天付款订单数
      last_pay_time string,     -- 最近结算时间
      flink_current_time string,
      primary key (begin_time, channel_code) not enforced
    ) with (
      "connector" = "jdbc",
      "url" = "jdbc:mysql://xxxx:3306/xxxx",    -- mysql连接地址及mysql表所在数据库名,jdbc格式
      "table-name" = "xxxx",            -- mysql表名
      "driver" = "com.mysql.jdbc.Driver",
      "username" = "xxxx",                    -- mysql用户名
      "password" = "xxxx",                   -- mysql密码
      "sink.buffer-flush.max-rows" = "1000",
      "sink.buffer-flush.interval" = "1s"
    );
    
    --********************************************************************--
    -- 临时中间表
    --********************************************************************--
    create view tmp_order_detail
    as
    select *
        , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
               else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
        , case when t.order_channel = "webShop" then _UTF16"网页商城"
               when t.order_channel = "appShop" then _UTF16"app商城"
               when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
               else _UTF16"其他" end as channel_name --渠道名称
    from (
        select *
            , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
            , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
        from trade_order_detail
        where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
        and real_pay is not null
    ) t
    where t.rn = 1;
    
    -- 按渠道统计各个指标
    insert into trade_channel_collect
    select
          begin_time  --统计数据的开始时间
        , channel_code
        , channel_name
        , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
        , count(distinct user_id) as cur_order_user_count --当天付款人数
        , count(1) as cur_order_count --当天付款订单数
        , max(pay_time) as last_pay_time --最近结算时间
    	, cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
    from tmp_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
    group by begin_time, channel_code, channel_name;
    表1 数据源表:电商业务订单详情宽表

    字段名

    字段类型

    说明

    order_id

    string

    订单ID

    order_channel

    string

    订单生成的渠道(即web方式、app方式等)

    order_time

    string

    订单时间

    pay_amount

    double

    订单金额

    real_pay

    double

    实际支付金额

    pay_time

    string

    支付时间

    user_id

    string

    用户ID

    user_name

    string

    用户姓名

    area_id

    string

    订单地区ID

    表2 结果表:各渠道的销售总额实时统计表。

    字段名

    字段类型

    说明

    begin_time

    varchar(32)

    开始统计指标的时间

    channel_code

    varchar(32)

    渠道编号

    channel_name

    varchar(32)

    渠道名

    cur_gmv

    double

    当天GMV

    cur_order_user_count

    bigint

    当天付款人数

    cur_order_count

    bigint

    当天付款订单数

    last_pay_time

    varchar(32)

    最近结算时间

    flink_current_time

    varchar(32)

    Flink数据处理时间

    作业逻辑说明如下:

    1. 创建一个Kafka源表,用来从Kafka指定Topic中读取消费数据;
    2. 创建一个结果表,用来通过JDBC向MySQL中写入结果数据。
    3. 实现相应的处理逻辑,以实现各个指标的统计。

      为了简化最终的处理逻辑,使用创建视图进行数据预处理。

      1. 利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string))。
      2. 根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
    4. 根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。

  4. 选择所创建的DLI通用队列提交作业。

    图27 提交作业

  5. 等待作业状态会变为“运行中”,单击作业名称,可以查看作业详细运行情况。

    图28 作业运行状态

配置数据可视化 DLV大屏

  1. 配置DLV大屏。在控制台单击“服务列表”,选择“数据可视化 DLV”,单击进入DLV页面。更多介绍请参考DLV介绍

    图29 DLV主页

  2. 创建VPC连接。单击“管理中心”>“管理VPC连接”,单击“创建VPC连接”。

    图30 VPC链接

  3. 单击“我的数据”>“新建数据连接”,填写相关值,单击“确定”(名称支持数字、字母、中文、_(下划线)、-(中划线),长度1-32位)。

    图31 新建数据连接

  4. 具体配置方法可参考DLV开发大屏。单击“我的大屏”>“新建大屏”,选择空白模板,填写大屏名称并创建大屏。

    图32 新建数据连接

  5. 选择大屏,单击编辑图标,进入大屏开发配置,例如现状图,单击“数据”导入MySQL数据,执行SQL查询RDS MySQL,并设置x、y、s坐标轴具体,样式设置参考组件指南,预览大屏实时展示。

    图33 新建数据连接
    图34 预览效果

函数工作流FunctionGraph定制化开发

在实际使用过程中,用户可以根据自己的业务场景参考函数工作流 FunctionGraph对代码进行定制化开发。

  1. 在控制台单击“服务列表”,选择“函数工作流 FunctionGraph”,单击进入函数主页面,单击“函数”>“函数列表”,打开该解决方案创建的函数。

    图35 函数

  2. 函数代码相关配置。用户可以根据实际情况,进行二次定制化代码开发及参数配置。

    图36 函数代码
    图37 环境变量

相关文档