更新时间:2024-04-29 GMT+08:00
分享

开始使用

本解决方案以真实数据(脱敏后)为基础,为用户提供配置样例,供用户借鉴参考

数据迁移示例

  1. 登录华为云云数据库RDS控制台,进入RDS列表,选择业务数据源登录。

    图1 登录RDS

  2. 查看业务数据源"cyz_test"数据库,"t_trade_order"和"t_user_store_info"两张表的基本信息(数据已提前导入),执行SQL获取对应需要在DLI中创建表的语句。

    图2 数据库的基本信息
    图3 建表结果

    获取DLI建表语句:

    CREATE TABLE ods_ddl_convert (
      source VARCHAR(100) NOT NULL,
      data_type1 VARCHAR(100) NOT NULL,
      target VARCHAR(100) NOT NULL,
      data_type2 VARCHAR(100),
      update_time VARCHAR(26),
      PRIMARY KEY (source, data_type1, target)
    ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = '数据库表结构转换';
    
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'bigint', 'hive', 'BIGINT', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'binary', 'hive', 'BINARY', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'char', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'datetime', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'decimal', 'hive', 'DOUBLE', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'double', 'hive', 'DOUBLE', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'float', 'hive', 'DOUBLE', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'int', 'hive', 'BIGINT', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'json', 'hive', 'MAP<STRING,STRING>', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'mediumtext', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'smallint', 'hive', 'BIGINT', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'text', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'time', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'timestamp', 'hive', 'STRING', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'tinyint', 'hive', 'BIGINT', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'varbinary', 'hive', 'BINARY', '2019-07-31 00:00:00');
    INSERT INTO ods_ddl_convert (source, data_type1, target, data_type2, update_time) VALUES ('mysql', 'varchar', 'hive', 'STRING', '2019-07-31 00:00:00');
    
    SET SESSION group_concat_max_len = 102400;
    SELECT
        a.TABLE_NAME ,
        concat('DROP TABLE IF EXISTS ',a.TABLE_NAME,';CREATE TABLE IF NOT EXISTS ',a.TABLE_NAME ,' (',group_concat(concat(a.COLUMN_NAME,' ',
        c.data_type2," COMMENT '",COLUMN_COMMENT,"'") order by a.TABLE_NAME,a.ORDINAL_POSITION) ,
        ") COMMENT '",b.TABLE_COMMENT ,"' STORED AS parquet;") AS col_name
    FROM
        (
            SELECT
                TABLE_SCHEMA,
                TABLE_NAME,
                COLUMN_NAME,
                ORDINAL_POSITION,
                DATA_TYPE,
                COLUMN_COMMENT
            FROM
                information_schema.COLUMNS
            WHERE
                TABLE_SCHEMA='cyz_test'
            ) AS a
    LEFT JOIN
        information_schema.TABLES AS b
    ON
        a.TABLE_NAME=b.TABLE_NAME
    AND a.TABLE_SCHEMA=b.TABLE_SCHEMA
    #选择源为mysql,目标为hive
    LEFT JOIN
        (
    	    select
    	    *
    	    from ods_ddl_convert
    	    where source='mysql' and target='hive'
        ) AS c
    ON
        a.DATA_TYPE=c.data_type1
    where b.TABLE_TYPE='BASE TABLE'
      and a.TABLE_NAME not like 'ods_%'
    GROUP BY
        a.TABLE_NAME,
        b.TABLE_COMMENT

  3. 登录数据湖探索 DLI服务控制台(首次使用您可以设置临时日志存储,根据弹窗设置OBS即可),选择SQL编辑->复制下方语句->选择模板自动创建的lightweight_offline_big_data_demo队列->default数据库->单击执行,确保左侧SQL编辑器数据库中表创建成功。

    图4 在default数据库中建表

    在数据湖探索 DLI的default数据库中执行的建表语句:

    CREATE TABLE IF NOT EXISTS default.t_user_store_info (
      uuid STRING COMMENT '主键ID',
      org_seq STRING COMMENT '组织机构编码',
      auth_code STRING COMMENT '授权码',
      store_name STRING COMMENT '店铺姓名',
      business_license STRING COMMENT '营业执照',
      store_type BIGINT COMMENT '店铺类型(1一代店、2二代店、3三代店、4四代店、5五代店、6六代店)',
      manager_area DOUBLE COMMENT '经营面积',
      building_area DOUBLE COMMENT '建筑面积',
      practical_area DOUBLE COMMENT '实用面积',
      leasehold_area DOUBLE COMMENT '租赁面积',
      warehouse_area DOUBLE COMMENT '仓库面积',
      open_status BIGINT COMMENT '营业状态(1-营业/0-停业)',
      open_time STRING COMMENT '营业时间',
      open_date STRING COMMENT '开业日期',
      province STRING COMMENT '所属省',
      city STRING COMMENT '所属城市',
      business_district_type BIGINT COMMENT '商圈类型',
      company_name STRING COMMENT '经销商公司名',
      legal_person STRING COMMENT '法人',
      legal_person_phone STRING COMMENT '法人手机号',
      store_owner_name STRING COMMENT '店主姓名',
      store_owner_phone STRING COMMENT '店主手机号',
      store_phone STRING COMMENT '店铺电话',
      store_owner_grade STRING COMMENT '店主评级',
      store_address STRING COMMENT '店铺地址',
      store_address_detail STRING COMMENT '店铺详细地址',
      store_lng STRING COMMENT '店铺经度',
      store_lat STRING COMMENT '店铺纬度',
      doorway_pic STRING COMMENT '门头照片',
      store_owner_pic STRING COMMENT '店主照片',
      store_manager_pic STRING COMMENT '店长照片',
      status BIGINT COMMENT '状态',
      create_time STRING COMMENT '创建时间',
      create_user STRING COMMENT '创建人',
      update_time STRING COMMENT '更新时间',
      update_user STRING COMMENT '更新人',
      gradelevel_id STRING COMMENT '等级id',
      store_manager_phone STRING COMMENT '店长电话',
      store_manager_name STRING COMMENT '店长姓名',
      merid STRING COMMENT '工行Merid(12位识别码)',
      agreement STRING COMMENT '工行协议号',
      wechat_id STRING COMMENT '微信识别码',
      up_org_zone STRING COMMENT '所属片区',
      base_code STRING COMMENT '所属CBD',
      base_person_code STRING COMMENT 'CBD所属业务员',
      up_org_seq STRING COMMENT '所属片区组织编码',
      open_begin_time STRING COMMENT '营业开始时间',
      open_end_time STRING COMMENT '营业结束时间',
      base_name STRING COMMENT '所属CBD名称',
      base_person_name STRING COMMENT 'CBD所属业务员姓名',
      store_code STRING COMMENT '店铺编码',
      shut_down_time STRING COMMENT '店铺停业时间',
      is_icbc BIGINT COMMENT '是否是工行对公账户(1-是 2-否)',
      front_card STRING COMMENT '法人身份证正面照片',
      back_card STRING COMMENT '法人身份证背面照片',
      settlement_pic STRING COMMENT '结算账号照片',
      created_date STRING COMMENT '建档日期',
      icbc_switch BIGINT COMMENT '银行解锁(1-已解锁,2-已锁定)',
      icbc_update_time STRING COMMENT '档案更新时间',
      bank_account STRING COMMENT '银行账号',
      bank_name STRING COMMENT '银行名称'
    ) COMMENT '门店信息' STORED AS parquet;
    CREATE TABLE IF NOT EXISTS default.t_trade_order (
      uuid STRING COMMENT '订单ID',
      order_no STRING COMMENT '订单编号',
      org_seq STRING COMMENT '门店编码',
      member_id STRING COMMENT '会员id',
      user_id STRING COMMENT '业务员id',
      user_name STRING COMMENT '业务员名称',
      user_tel STRING COMMENT '业务员电话',
      head_pic_url STRING COMMENT '业务员头像',
      member_name STRING COMMENT '买家姓名',
      member_head_pic_url STRING COMMENT '买家头像',
      tel STRING COMMENT '买家手机号',
      order_date STRING COMMENT '订单日期',
      pay_date STRING COMMENT '支付日期',
      order_source BIGINT COMMENT '订单来源(1线上商城/2线下零售/3积分商城/4刷脸支付)',
      total_amount DOUBLE COMMENT '总数量',
      total_money DOUBLE COMMENT '总金额',
      salesevent_deduct_money DOUBLE COMMENT '活动抵扣金额',
      discount_money DOUBLE COMMENT '定向劵扣减金额',
      received_money DOUBLE COMMENT '实收金额',
      coupon_deduct_money DOUBLE COMMENT '通用券抵扣金额',
      pay_money DOUBLE COMMENT '应付款金额',
      pay_method BIGINT COMMENT '付款方式(0线下支付/1线上支付/2微信/3支付宝/4银行卡)',
      delivery_method BIGINT COMMENT '配送方式(1门店自提/2配送服务/3快递服务)',
      order_status BIGINT COMMENT '订单状态(0待支付/1待发货/2待收货/3已完成/4已取消)',
      pay_status BIGINT COMMENT '支付状态(0待支付/1已支付)',
      buyer_note STRING COMMENT '买家留言',
      tdcode STRING COMMENT '提货码',
      tdqrcode STRING COMMENT '提货二维码',
      order_bonuspoint DOUBLE COMMENT '本次积分',
      status BIGINT COMMENT ' 0 删除 1成功',
      create_user STRING COMMENT '创建人',
      create_time STRING COMMENT '创建时间',
      update_user STRING COMMENT '更新人',
      update_time STRING COMMENT '更新时间',
      pay_time STRING COMMENT '支付时间',
      delivery_time STRING COMMENT '发货时间',
      completed_time STRING COMMENT '完成时间(收货时间)',
      canceled_time STRING COMMENT '取消时间',
      member_coupon_id STRING COMMENT '会员卡卷id',
      return_status BIGINT COMMENT '退货状态(0正常单/1被退货单/2退货单',
      return_order_id STRING COMMENT '退货单原单id/被退货单退货单id',
      auto_completed_status BIGINT COMMENT '自动收货延期状态(0未延期/1已延期)',
      auto_completed_date STRING COMMENT '自动收货日期',
      is_customize_order BIGINT COMMENT '是否定制订单(1是0否)',
      customize_type BIGINT COMMENT '定制类型(1、2)',
      customize_status BIGINT COMMENT '定制状态(1审核中/2已驳回/3审核成功待生产/4生产中/5生产完成待配送/6已配送)',
      customize_price DOUBLE COMMENT '定制单价',
      customize_amount DOUBLE COMMENT '定制总金额'
    ) COMMENT '订单信息' STORED AS parquet;

  4. 登录云数据迁移服务 CDM控制台,选择模板自动创建的集群,单击右侧的作业管理。选择连接管理,单击新建连接,选择数据湖探索(DLI)单击 下一步。

    图5 作业管理
    图6 连接管理
    图7 数据湖探索

  5. 填写名称“dli-link”,输入华为云ak、sk(如何获取AK/SK),单击保存。

    图8 新建连接

  6. 单击新建连接,选择“云数据库 MySQL”,填写名称“rds-link”,选择“rds-dgc”源端业务数据库实例,端口“3306”,数据库名称“cyz_test”,用户名/密码,单击保存。

    图9 数据库连接

    1. 首次连接RDS需要上传驱动,单击图中上方的蓝色字体“驱动管理”进行驱动上传,驱动版本参考驱动下载地址

    2. RDS和CDM在同一个VPC时可以直接连接,否则RDS和CDM均需绑定EIP后使用公网ip,同时RDS安全组添加CDM的入方向规则,放开22号端口,安全组规则添加参考添加安全组规则

  7. (a)选择表/文件迁移,单击新建作业,输入作业名称“t_user_store_info”,作业信息按照下图进行填写。配置完成后单击下一步,字段会自动映射,无需修改,单击下一步,单击保存。

    (b)选择表/文件迁移,单击新建作业,输入作业名称“t_trade_order”,作业信息按照下图进行填写。配置完成后单击下一步,字段会自动映射,无需修改,单击下一步,单击保存。
    图10 表1作业配置
    图11 表2作业配置

  8. 登录数据治理中心 DataArts Studio控制台,选择模板自动创建的实例,选择default工作空间,单击数据开发,在“作业”目录右键,单击新建作业,输入作业名称“dgc_test”作业信息默认即可,单击确定。

    图12 新建作业

  9. 在弹出的界面中选择“节点库”,拖动1个Dummy节点、2个CDM Job节点,按下图填写配置。

    图13 CDM job节点1配置
    图14 CDM job节点2配置

  10. 按照截图填好信息后,修改调度信息。单击“保存并提交版本”,单击“执行调度”,单击“测试运行”。

    图15 调度配置
    图16 执行调度后测试运行

  11. 执行CDM作业,将RDS的两张表“t_user_store_info”、“t_trade_order”的数据同步到DLI的表中。

    图17 数据迁移成功
    图18 在DLI查看数据迁移结果

数据分析处理示例

  1. 在完成数据迁移基础上,拖动1个DLI SQL节点到右侧的画布中,按下图填写配置。

    图19 DLI SQL节点配置

    对应的DLI SQL语句为(在这里实现业务定制化需求):

    DROP TABLE IF EXISTS dwd_sell_info;
    CREATE TABLE IF NOT EXISTS dwd_sell_info AS
    SELECT
      user_name,
      sum(total_money) as sell_total_money
    FROM
      default.t_trade_order
    group by
      user_name
    order by
      sell_total_money desc
    limit
      10;

  2. 按照截图填好信息后,修改调度信息。单击“保存并提交版本”,单击“执行调度”,单击“测试运行”。

    图20 调度配置
    图21 执行调度后测试运行

  3. 执行CDM作业,将RDS的两张表“t_user_store_info”、“t_trade_order”的数据更新到DLI的表中,并做分析处理,作业执行成功。

    图22 执行成功

    执行成功

    图23 在DLI查看数据分析处理结果

相关文档