文档首页/ 数据仓库服务 DWS/ 最佳实践/ 数据迁移/ 使用DLI Flink作业实时同步MySQL数据至DWS集群
更新时间:2025-08-22 GMT+08:00
分享

使用DLI Flink作业实时同步MySQL数据至DWS集群

本实践演示如何使用华为云DLI服务的Flink作业(以Flink 1.15版本为例),将MySQL数据实时同步到DWS。

了解DLI请参见数据湖产品介绍

本实践预计时长90分钟,实践用到的云服务包括虚拟私有云 VPC及子网云数据库 RDS、数据湖探索 DLI对象存储服务 OBS数据仓库服务 DWS,基本流程如下:

  1. 准备工作:注册账号,准备网络等。
  2. 步骤一:准备MySQL数据:购买云服务RDS,并创建源表和插入数据。
  3. 步骤二:创建DWS集群:购买DWS集群,并创建目标表。
  4. 步骤三:创建DLI弹性资源池及队列:创建DLI弹性资源池,并为资源池添加队列。
  5. 步骤四:创建增强型跨源连接:用于连接RDS和DWS。
  6. 步骤五:准备DWS对接Flink工具dws-connector-flink:使用该插件,提高MySQL入库DWS的性能。
  7. 步骤六:创建DLI Flink作业:创建Flink SQL作业,配置SQL代码。
  8. 步骤七:验证数据同步:保证入库的数据一致性。
  9. 更多信息:在Flink跨源开发场景下,如果在作业脚本中直接配置数据源认证信息,存在密码泄露的风险。建议使用DLI提供的跨源认证功能,不要在作业脚本中直接指定MySQL和DWS的用户名和密码。

准备工作

步骤一:准备MySQL数据

  1. 登录RDS控制台,购买RDS实例,参见表1配置关键参数,其他参数可保持默认,如需了解详情请参见RDS文档

    表1 RDS参数

    参数项

    取值

    计费模式

    按需计费

    区域

    华北-北京四

    实例名称

    rds-demo

    数据库引擎

    MySQL

    数据库版本

    5.7及以上

    数据库端口

    3306

  2. 连接RDS实例,执行以下命令,创建名为mys_data数据库。

    1
    CREATE DATABASE mys_data;
    

  3. 切换到新的数据库mys_data,并执行以下命令,创建表mys_orders。

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE mys_data.mys_order
         ( order_id      VARCHAR(12),
           order_channel VARCHAR(32),
           order_time    DATETIME,
           cust_code     VARCHAR(6),
           pay_amount    DOUBLE,
           real_pay      DOUBLE,
           PRIMARY KEY (order_id) );
    

  4. 执行以下命令向表中插入数据。

    1
    2
    INSERT INTO mys_data.mys_order VALUES ('202306270001', 'webShop', TIMESTAMP('2023-06-27 10:00:00'), 'CUST1', 1000, 1000);
    INSERT INTO mys_data.mys_order VALUES ('202306270002', 'webShop', TIMESTAMP('2023-06-27 11:00:00'), 'CUST2', 5000, 5000);
    

  5. 执行以下命令,验证数据是否插入成功。

    1
    SELECT * FROM mys_data.mys_order;
    

步骤二:创建DWS集群

  1. 创建集群,同时为确保网络连通,DWS集群的区域、VPC选择与RDS实例保持一致,本实践为“华北-北京四”,虚拟私有云与上面创建RDS的虚拟私有云保持一致。
  2. DWS控制台的“专属集群 > 集群列表”页面,单击指定集群所在行操作列的“登录”按钮。登录信息如下:

    • 数据源名称:dws-demo
    • 集群:创建的DWS集群
    • 数据库:gaussdb
    • 用户名:dbadmin
    • 密码:创建DWS集群设置的密码

  3. 勾选“记住密码”,单击“测试连接”,连接成功后,单击“确定”。
  4. 复制如下SQL语句,在SQL窗口中,单击“执行SQL”,创建名为dws_data的SCHEMA。

    1
    CREATE SCHEMA dws_data;
    

  1. 在新建的SCHEMA下,创建dws_order表。

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE dws_data.dws_order
            ( order_id      VARCHAR(12),
              order_channel VARCHAR(32),
              order_time    TIMESTAMP,
              cust_code     VARCHAR(6),
              pay_amount    DOUBLE PRECISION,
              real_pay      DOUBLE PRECISION );
    

  1. 查询数据,当前为空表。

    1
    SELECT * FROM dws_data.dws_order;
    

步骤三:创建DLI弹性资源池及队列

  1. 登录华为云DLI控制台
  2. 左侧导航栏选择“资源管理 > 弹性资源池”,进入弹性资源池管理页面。
  3. 单击右上角“购买弹性资源池”,填写如下参数,其他参数项如表中未说明,默认即可

    表2 DLI弹性资源池

    参数项

    参数值

    计费模式

    按需计费

    区域

    华北-北京四

    名称

    dli_dws

    规格

    基础版

    网段

    172.16.0.0/18,需选择与MySQL和DWS不在同一个网段。例如,如果MySQL和DWS在192.168.x.x网段,则DLI则选择172.16.x.x。

  4. 单击“立即购买”,单击“提交”。

    等待资源池创建成功,继续执行下一步。

  5. 在弹性资源池页面,单击创建好的资源池所在行右侧的“添加队列”,填写如下参数,其他参数项如表中未说明,默认即可。

    表3 添加队列

    参数项

    参数值

    名称

    dli_dws

    类型

    通用队列

  6. 单击“下一步”,单击“确定”。队列创建成功。

步骤四:创建增强型跨源连接

  1. 更新DLI的委托权限。

    1. 回到DLI管理控制台,左侧选择“全局配置 > 服务授权”。
    2. 勾选“DLI UserInfo Agency Access”、“DLI Datasource Connections Agency Access”、“DLI Notification Agency Access”。
    3. 单击“更新委托权限”。单击“确定”。
    图1 更新DLI委托权限

  2. 放通RDS的安全组,允许DLI队列所在的网段可以访问RDS。

    1. 左侧选择“资源管理 > 队列管理”,记录dli_dws所在网段。
      图2 DLI队列网段
    2. 切换到RDS的控制台,左侧选择“实例管理”,单击创建好的RDS实例名称,进入概览。
    3. 记录“网络信息”的“内网地址”,后续测试连通性的步骤需要使用。
    4. 单击“网络信息”中安全组旁边的“管理”。
      图3 RDS安全组

    5. 在弹出的安全组列表中,单击安全组名称,进入安全组配置页面。
    6. 选择“入方向规则 > 添加规则”,如下图所示,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI弹性资源池及队列的时候填入的网段保持一致。
      图4 RDS安全组添加规则

    7. 单击“确定”。

  3. 回到DLI管理控制台,单击左侧的“跨源管理”,选择“增强型跨源”,单击“创建”。
  4. 填写如下参数,其他参数项如表中未说明,默认即可。

    表4 DLI到RDS的连接

    参数项

    参数值

    连接名称

    dli_rds

    弹性资源池

    选择上面创建的DLI弹性资源池。

    虚拟私有云

    选择RDS所在的虚拟私有云。

    子网

    选择RDS所在的子网。

    其他参数

    保持默认。

    图5 创建跨源连接

  5. 单击“确定”。等待RDS连接创建成功。
  6. 测试DLI到RDS的连通性。

    1. 左侧导航栏选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
    2. 地址栏内输入2.c记录的RDS的内网地址和3306端口。
    3. 单击“测试”,验证DLI连通RDS成功。
      图6 测试RDS与DLI连通

  7. 如果连通性测试失败,请参见以下步骤操作。

  8. 测试DLI到DWS的连通性。

    1. 进入到DWS控制台,左侧导航栏单击“专属集群 > 集群列表”,单击集群名称进入DWS集群详情。
    2. 如下图,记录下DWS集群的内网IP(两个取一个即可)和端口,以备后面步骤需要。
      图7 DWS内网IP
    3. 单击安全组名称。
      图8 DWS安全组
    4. 选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与4填入的网段保持一致。

      图9 DWS安全组添加规则
    5. 单击“确定”。
    6. 再切换到DLI控制台,左侧选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
    7. 在地址栏中,输入获取的DWS集群的内网IP和端口。
    8. 单击“测试”,验证DLI连通DWS成功。
      图10 测试DWS连通

步骤五:准备DWS对接Flink工具dws-connector-flink

dws-connector-flink是一款基于DWS JDBC接口实现对接Flink的一个工具。在配置DLI作业阶段,将该工具及依赖放入Flink类加载目录,提升Flink作业入库DWS的能力。

  1. 浏览器访问https://mvnrepository.com/artifact/com.huaweicloud.dws
  2. 在软件列表中选择Flink相应版本号1.15,本实践选择DWS Connector Flink SQL 1 15

  3. 选择最新分支,实际请以官网发布的新分支为准。

  4. 单击jar图标下载即可。

  5. 创建OBS桶,本实践桶名设置为obs-flink-dws,并将此文件dws-connector-flink-sql-1.15-2.12_2.0.0.r4.jar上传到OBS桶下,注意桶也保持与DLI在一个区域下,本实践为“ 华北-北京四”。

步骤六:创建DLI Flink作业

  1. 创建OBS委托策略。

    1. 鼠标悬浮在控制台右上角的账户名称上,单击“统一身份认证”。
    2. 左侧选择“委托”,单击右上角“创建委托”。
      • 委托名称:dli_ac_obs。
      • 委托类型:云服务。
      • 云服务:数据湖探索 DLI。
      • 持续时间:永久。
      图11 创建OBS委托策略

    3. 单击“完成”,单击“立即授权”。
    4. 进入授权页面,单击“新建策略”。
    5. 配置策略信息。输入策略名称,本例:dli_ac_obs,选择“JSON视图”。
    6. 在策略内容中粘贴自定义策略。“OBS桶名”替换为5创建的实际桶名。
       {
          "Version": "1.1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "obs:object:GetObject",
                      "obs:object:DeleteObjectVersion",
                      "obs:bucket:GetBucketLocation",
                      "obs:bucket:GetLifecycleConfiguration",
                      "obs:object:AbortMultipartUpload",
                      "obs:object:DeleteObject",
                      "obs:bucket:GetBucketLogging",
                      "obs:bucket:HeadBucket",
                      "obs:object:PutObject",
                      "obs:object:GetObjectVersionAcl",
                      "obs:bucket:GetBucketAcl",
                      "obs:bucket:GetBucketVersioning",
                      "obs:bucket:GetBucketStoragePolicy",
                      "obs:bucket:ListBucketMultipartUploads",
                      "obs:object:ListMultipartUploadParts",
                      "obs:bucket:ListBucketVersions",
                      "obs:bucket:ListBucket",
                      "obs:object:GetObjectVersion",
                      "obs:object:GetObjectAcl",
                      "obs:bucket:GetBucketPolicy",
                      "obs:bucket:GetBucketStorage"
                  ],
                  "Resource": [
                      "OBS:*:*:object:*",
                      "OBS:*:*:bucket:OBS桶名"
                  ]
              },
              {
                  "Effect": "Allow",
                  "Action": [
                      "obs:bucket:ListAllMyBuckets"
                  ]
              }
          ]
      }
    7. 新建策略完成后,单击“下一步”,返回委托授权页面。
    8. 选择上面创建的自定义策略。
    9. 单击“下一步”,选择委托的授权范围,选择“全局服务资源”。
    10. 单击“确定”,完成授权。

      授权后需等待15-30分钟才可生效。

  2. 回到DLI管理控制台,左侧选择“作业管理 > Flink作业”,单击右上角“创建作业”。
  3. 类型选择“Flink OpenSource SQL”,名称填写rds-dws。

    图12 创建作业

  4. 单击“确定”。系统自动进入到作业的编辑页面。
  5. 在页面右侧填写如下参数,其他参数项如表中未说明,默认即可。

    表5 flink作业参数

    参数项

    参数值

    所属队列

    选择4的dli_dws。

    Flink版本

    1.15或更高(实际版本请与界面为准)。

    UDF Jar

    选择5的OBS桶中的jar文件。

    委托

    选择1创建的委托。

    OBS桶

    选择5的桶。

    保存作业日志

    勾选

    开启Checkpoint

    勾选

    其他参数

    保持默认

  6. 将以下符合Flink要求的SQL代码复制到左侧的SQL代码窗。

    “RDS数据库的内网IP”参见2.c获取,“DWS集群内网IP”参见8.b获取,并修改RDS数据库的root用户密码、DWS的dbadmin用户密码。

    以下是Flink SQL作业代码中常见的参数解释:

    • connector:用于指定表的数据源的连接器类型,MySQL数据源指定为mysql-cdc,DWS可指定为gaussdb,更多取值可参见Connector列表
    • driver:是目的端DWS的JDBC驱动名称,可固定为com.huawei.gauss200.jdbc.Driver。
    • write.mode:入库方式,支持copy, insert以及upsert三种。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    CREATE TABLE
      mys_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'mysql-cdc',
        'hostname' = 'RDS数据库的内网IP',
        'port' = '3306',
        'username' = 'root',
        'password' = 'RDS数据库的root用户密码',
        'database-name' = 'mys_data',
        'table-name' = 'mys_order'
      );
    
    CREATE TABLE
      dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE,
        PRIMARY KEY (order_id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://DWS集群内网IP:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'username' = 'dbadmin',
        'password' = 'DWS的dbadmin用户密码',
        'write.mode' = 'insert'
      );
    
    INSERT INTO
      dws_order
    SELECT
      *
    FROM
      mys_order;
    

  7. 单击“格式化”,再单击“保存”。

    请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。

    图13 Flink作业参数

  1. 回到DLI控制台首页,左侧选择“作业管理 > Flink作业”。
  2. 单击作业名称rds-dws右侧的“启动”,单击“立即启动”。

    等待约1分钟,再刷新页面,状态在“运行中”表示作业成功运行。

    图14 Flink运行成功

步骤七:验证数据同步

  1. 回到DWS数据库的SQL窗口,如果连接超时,参见以下重新登录。

    1. 切换到DWS管理控制台。
    2. 左侧导航选“专属集群 > 集群列表”,单击dws-demo所在行右侧的“登录”。

  2. 执行以下查询语句,发现MySQL的表的两行数据已经同步至DWS。

    1
    SELECT * FROM dws_data.dws_order;
    
    图15 查询结果

  3. 切换到RDS的MySQL的SQL界面,再执行以下语句,插入3条新的数据。

    1
    2
    3
    INSERT INTO mys_data.mys_order VALUES ('202403090003', 'webShop', TIMESTAMP('2024-03-09 13:00:00'), 'CUST1', 2000, 2000);
    INSERT INTO mys_data.mys_order VALUES ('202403090004', 'webShop', TIMESTAMP('2024-03-09 14:00:00'), 'CUST2', 3000, 3000);
    INSERT INTO mys_data.mys_order VALUES ('202403100004', 'webShop', TIMESTAMP('2024-03-10 10:00:00'), 'CUST3', 6000, 6000);
    
    图16 MySQL新增数据

  4. 回到DWS的SQL窗口再次执行以下SQL查询,从返回结果可以看到MySQL数据已实时同步至DWS。

    1
    SELECT * FROM dws_data.dws_order;
    
    图17 数据实时同步

更多信息

在Flink跨源开发场景下,如果在作业脚本中直接配置数据源认证信息,存在密码泄露的风险。建议使用DLI提供的跨源认证功能,不要在作业脚本中直接指定MySQL和DWS的用户名和密码。

  1. 登录华为云DLI控制台,选择“跨源管理 > 跨源认证”。
  2. 单击“创建”。
  3. 创建MySQL的root用户密码认证。

    1. 填写如下参数。
      • 类型选择“Password”。
      • 认证信息名称,填写:mysql_pwd_auth。
      • 用户名:root。
      • 用户密码:root用户密码。
      图18 MySQL密码认证
    2. 单击“确定”。

  4. 创建DWS的dbadmin用户密码认证。

    1. 填写如下参数。
      • 类型选择“Password”。
      • 认证信息名称,填写:dws_pwd_auth。
      • 用户名:dbadmin。
      • 用户密码:dbadmin用户密码。
      图19 DWS密码认证
    2. 单击“确定”。

  5. 回到DLI管理控制台,选择“作业管理 > Flink作业”,在步骤六:创建DLI Flink作业创建的作业名称所在行右侧,选择“更多 > 停止”,停止作业。
  6. 作业停止后,单击作业名称所在行右侧的“编辑”。
  7. 将SQL脚本替换成以下最新。

    文件中,仅需替换RDS内网IP地址、DWS内网IP地址即可。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    CREATE TABLE mys_order (
      order_id STRING,
      order_channel STRING,
      order_time TIMESTAMP,
      cust_code STRING,
      pay_amount DOUBLE,
      real_pay DOUBLE,
      PRIMARY KEY (order_id) NOT ENFORCED ) 
    WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'RDS内网IP地',
      'port' = '3306',
      'pwd_auth_name' = 'mysql_pwd_auth',
      'database-name' = 'mys_data',
      'table-name' = 'mys_order' );
     
    CREATE TABLE dws_order (
        order_id STRING,
        order_channel STRING,
        order_time TIMESTAMP,
        cust_code STRING,
        pay_amount DOUBLE,
        real_pay DOUBLE, 
        PRIMARY KEY (order_id) NOT ENFORCED )
    WITH (
        'connector' = 'gaussdb',
        'driver' = 'com.huawei.gauss200.jdbc.Driver',
        'url' = 'jdbc:gaussdb://DWS内网IP地址:8000/gaussdb',
        'table-name' = 'dws_data.dws_order',
        'pwd_auth_name' = 'dws_pwd_auth',
        'write.mode' = 'insert' );
     
    INSERT INTO dws_order SELECT * FROM mys_order;
    

  8. 替换文件后,单击“格式化”,并单击“保存”。
  9. 重新启动作业,再参见步骤七:验证数据同步,验证数据同步成功。

相关文档