文档首页/ 数据仓库服务 DWS/ 最佳实践/ 数据迁移/ 使用DLI Flink作业实时同步Kafka数据至DWS集群
更新时间:2025-08-22 GMT+08:00

使用DLI Flink作业实时同步Kafka数据至DWS集群

本实践演示通过数据湖探索服务 DLI Flink作业(以Flink 1.15版本为例)将分布式消息服务 Kafka的消费数据实时同步至DWS数据仓库,实现Kafka实时入库到DWS的过程。演示过程包括实时写入和更新已有数据的场景。

图1 Kafka实时入库DWS

本实践预计时长90分钟,实践用到的云服务包括虚拟私有云 VPC及子网弹性负载均衡 ELB弹性云服务器 ECS对象存储服务 OBS、分布式消息服务 Kafka数据湖探索 DLI数据仓库服务 DWS,基本流程如下:

  1. 准备工作:注册账号,准备网络等。
  2. 步骤一:创建Kafka实例:购买分布式消息服务Kafka版,并准备Kafka数据。
  3. 步骤二:创建绑定ELB的DWS集群和目标表:购买DWS集群并绑定EIP。
  4. 步骤三:创建DLI弹性资源池及队列:创建DLI弹性资源池,并为资源池添加队列。
  5. 步骤四:创建Kafka和DWS的增强型跨源连接:用于连接Kafka和DWS。
  6. 步骤五:准备DWS对接Flink工具dws-connector-flink:使用该插件,提高MySQL入库DWS的性能。
  7. 步骤六:创建并编辑DLI Flink作业:创建Flink SQL作业,配置SQL代码。
  8. 步骤七:通过Kafka客户端生产和修改消息:完成数据实时入仓。

场景描述

假设数据源Kafka的样例数据是一个用户信息表,如表1所示, 包含 id,name,age三个字段。其中id是唯一且固定的字段,多个业务系统会共用,业务上一般不需要修改,仅修改姓名name,年龄age。

首先,通过Kafka生产以下三组数据,通过DLI Flink作业完成数据同步到数据仓库服务 DWS。接着,需要修改id为2和3的用户为新的jim和tom,再通过DLI Flink作业完成数据的更新并同步到DWS。

表1 样例数据

id

name

age

1

lily

16

2

lucy > jim

17

3

lilei > tom

15

约束限制

  • 确保VPC、ECS、OBS、Kafka、DLI和DWS服务在同一个区域内,例如中国-香港。
  • 确保Kafka、DLI、DWS网络互通。本实践将Kafka和DWS创建在同一个区域和虚拟私有云下,同时在Kafka和DWS的安全组中放通了DLI的队列所在网段,确保网络互通。
  • 为确保DLI到DWS的连接链路稳定,请创建完DWS集群后为集群绑定ELB服务。

准备工作

  • 已注册华为账号并开通华为云,且在使用DWS 前检查账号状态,账号不能处于欠费或冻结状态。
  • 已创建虚拟私有云和子网,参见创建虚拟私有云和子网

步骤一:创建Kafka实例

  1. 登录Kafka管理控制台购买页面
  2. 关键参数如下表说明,其他参数项如表中未说明,默认即可:

    表2 kafka实例参数

    参数项

    参数值

    计费模式

    按需计费

    区域

    中国-香港

    可用区

    可用区1(如遇售罄,选择其他可用区)

    套餐规格

    入门规格

    虚拟私有云

    选择已创建的虚拟私有云,如果没有,则需要创建。

    安全组

    选择已创建的安全组,如果没有,则需要创建。

    其他参数

    保持默认

  3. 单击“确认订单”,核对无误,单击“提交”。等待创建成功。
  4. 创建成功后,在Kafka实例列表中,单击创建好的Kafka实例名称,进入基本信息页面。
  5. 左侧选择“Topic管理”,单击“创建Topic”。

    Topic名称设置为“topic-demo”,其他可保持默认。

    图2 创建Topic

  6. 单击“确定”,在Topic列表中可以看到topic-demo已创建成功。
  7. 左侧导航栏选择“消费组管理”,单击“创建消费组”。
  8. 消费组名称输入“kafka01”,单击“确定”。

步骤二:创建绑定ELB的DWS集群和目标表

  1. 创建独享型弹性负载均衡服务ELB,网络类型选择IPv4私网即可,区域、VPC选择与Kafka实例保持一致,本实践为“中国-香港”。
  2. 创建集群,为DWS绑定弹性负载均衡 ELB,同时为确保网络连通,DWS集群的区域、VPC选择与Kafka实例保持一致,本实践为“中国-香港”,虚拟私有云与上面创建Kafka的虚拟私有云保持一致。
  3. DWS控制台的“专属集群 > 集群列表”页面,单击指定集群所在行操作列的“登录”按钮。

    本实践以8.1.3.x版本为例,8.1.2及以前版本不支持此登录方式,可以gsql登录。

  4. 登录成功后,进入SQL编辑器。
  5. 复制如下SQL语句,在SQL窗口中,单击“执行SQL”,创建目标表user_dws。

    1
    2
    3
    4
    5
    6
    CREATE TABLE user_dws (
    id int,
    name varchar(50),
    age int,
    PRIMARY KEY (id)
    );
    

步骤三:创建DLI弹性资源池及队列

  1. 登录华为云DLI控制台
  2. 左侧导航栏选择“资源管理 > 弹性资源池”,进入弹性资源池管理页面。
  3. 单击右上角“购买弹性资源池”,关键参数如下说明,其他参数项如表中未说明,默认即可

    表3 弹性资源池

    参数项

    参数值

    计费模式

    按需计费

    区域

    中国-香港

    名称

    dli_dws

    规格

    基础版

    网段

    172.16.0.0/18,需选择与Kafka和DWS不在同一个网段。例如,如果Kafka和DWS在192.168.x.x网段,则DLI则选择172.16.x.x。

  4. 单击“立即购买”,单击“提交”。

    等待资源池创建成功,继续执行下一步。

  5. 在弹性资源池页面,单击创建好的资源池所在行右侧的“添加队列”,填写如下参数,其他参数项如表中未说明,默认即可。

    表4 添加队列

    参数项

    参数值

    名称

    dli_dws

    类型

    通用队列

  6. 单击“下一步”,单击“确定”。队列创建成功。

步骤四:创建Kafka和DWS的增强型跨源连接

  1. 更新DLI的委托权限。

    1. 回到DLI管理控制台,左侧选择“全局配置 > 服务授权”。
    2. 勾选“DLI UserInfo Agency Access”、“DLI Datasource Connections Agency Access”、“DLI Notification Agency Access”。
    3. 单击“更新委托权限”。单击“确定”。
    图3 更新DLI委托权限

  2. 放通Kafka的安全组,允许DLI队列所在的网段可以访问Kafka。

    1. 回到Kafka控制台,单击Kafka实例名称进入基本信息。查看“连接信息”的“内网连接地址”,并记录下此地址,以备后续步骤使用。
      图4 kafka内网连接地址
    2. 单击网络的安全组名称。
      图5 kafka安全组
    3. 选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI弹性资源池及队列的时候填入的网段保持一致。
      图6 kafka安全组添加规则
    4. 单击“确定”。

  3. 回到DLI管理控制台,单击左侧的“跨源管理”,选择“增强型跨源”,单击“创建”。
  4. 填写如下参数,其他参数项如表中未说明,默认即可。

    表5 DLI到Kafka的连接

    参数项

    参数值

    连接名称

    dli_kafka

    弹性资源池

    选择上面创建的DLI队列名称dli_dws。

    虚拟私有云

    选择Kafka所在的虚拟私有云。

    子网

    选择Kafka所在的子网。

    其他参数

    保持默认。

    图7 创建连接

  5. 单击“确定”。等待Kafka连接创建成功。

    如果增强型跨源连接创建时,没有选择弹性资源池,也可以在跨源连接创建成功后,手动绑定弹性资源池:

    1. 选择跨源连接所在行右侧的“更多 > 绑定弹性资源池”。
    2. 选择相应的弹性资源池,单击“确定”。

  6. 左侧导航栏选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
  7. 在地址栏中,输入2.a获取的Kafka实例的内网IP和端口(Kafka的地址有三个,输入一个即可)。

    图8 测试kafka连通性

  8. 单击“测试”,验证DLI连通Kafka成功。
  9. 如果连通性测试失败,请参见以下步骤操作。

  10. 进入到DWS管理控制台,左侧导航栏单击“专属集群 > 集群列表”,单击集群名称进入DWS集群详情。
  11. 如下图,记录下DWS集群的内网域名、端口和弹性负载均衡地址,以备后面步骤需要。

    图9 内网域名和ELB地址

  12. 单击安全组名称。

    图10 DWS安全组

  13. 选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI弹性资源池及队列的时候填入的网段保持一致。

    图11 DWS安全组添加规则

  14. 单击“确定”。
  15. 再切换到DLI控制台,左侧选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
  16. 在地址栏中,输入11获取的DWS集群的弹性负载均衡IP和端口。

    图12 测试DWS连通

  17. 单击“测试”,验证DLI连通DWS成功。

步骤五:准备DWS对接Flink工具dws-connector-flink

dws-connector-flink是一款基于DWS JDBC接口实现对接Flink的一个工具。在配置DLI作业阶段,将该工具及依赖放入Flink类加载目录,提升Flink作业入库DWS的能力。

  1. 浏览器访问https://mvnrepository.com/artifact/com.huaweicloud.dws
  2. 在软件列表中选择Flink相应版本号1.15,本实践选择DWS Connector Flink SQL 1 15

  3. 选择最新分支,实际请以官网发布的新分支为准。

  4. 单击jar图标下载即可。

  5. 创建OBS桶,本实践桶名设置为obs-flink-dws,并将此文件dws-connector-flink-sql-1.15-2.12_2.0.0.r4.jar上传到OBS桶下,注意桶也保持与DLI在一个区域下,本实践为“中国-香港”。

步骤六:创建并编辑DLI Flink作业

  1. 创建OBS委托策略。

    1. 鼠标悬浮在控制台右上角的账户名称上,单击“统一身份认证”。
    2. 左侧选择“委托”,单击右上角“创建委托”。
      • 委托名称:dli_ac_obs。
      • 委托类型:云服务。
      • 云服务:数据湖探索 DLI。
      • 持续时间:永久。
      图13 创建OBS委托策略

    3. 单击“完成”,单击“立即授权”。
    4. 进入授权页面,单击“新建策略”。
    5. 配置策略信息。输入策略名称,本例:dli_ac_obs,选择“JSON视图”。
    6. 在策略内容中粘贴自定义策略。“OBS桶名”替换为实际的桶名。
       {
          "Version": "1.1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "obs:object:GetObject",
                      "obs:object:DeleteObjectVersion",
                      "obs:bucket:GetBucketLocation",
                      "obs:bucket:GetLifecycleConfiguration",
                      "obs:object:AbortMultipartUpload",
                      "obs:object:DeleteObject",
                      "obs:bucket:GetBucketLogging",
                      "obs:bucket:HeadBucket",
                      "obs:object:PutObject",
                      "obs:object:GetObjectVersionAcl",
                      "obs:bucket:GetBucketAcl",
                      "obs:bucket:GetBucketVersioning",
                      "obs:bucket:GetBucketStoragePolicy",
                      "obs:bucket:ListBucketMultipartUploads",
                      "obs:object:ListMultipartUploadParts",
                      "obs:bucket:ListBucketVersions",
                      "obs:bucket:ListBucket",
                      "obs:object:GetObjectVersion",
                      "obs:object:GetObjectAcl",
                      "obs:bucket:GetBucketPolicy",
                      "obs:bucket:GetBucketStorage"
                  ],
                  "Resource": [
                      "OBS:*:*:object:*",
                      "OBS:*:*:bucket:OBS桶名"
                  ]
              },
              {
                  "Effect": "Allow",
                  "Action": [
                      "obs:bucket:ListAllMyBuckets"
                  ]
              }
          ]
      }
    7. 新建策略完成后,单击“下一步”,返回委托授权页面。
    8. 选择上面创建的自定义策略。
    9. 单击“下一步”,选择委托的授权范围,选择“全局服务资源”。
    10. 单击“确定”,完成授权。

      授权后需等待15-30分钟才可生效。

  2. 回到DLI管理控制台,左侧选择“作业管理 > Flink作业”,单击右上角“创建作业”。
  3. 类型选择“Flink OpenSource SQL”,名称填写kafka-dws。

    图14 创建作业

  4. 单击“确定”。系统自动进入到作业的编辑页面。
  5. 在页面右侧填写如下参数,其他参数项如表中未说明,默认即可。

    表6 flink作业参数

    参数项

    参数值

    所属队列

    dli_dws

    Flink版本

    1.15

    UDF Jar

    选择步骤五:准备DWS对接Flink工具dws-connector-flink的OBS桶中的jar文件。

    委托

    选择1创建的委托。

    OBS桶

    选择步骤五:准备DWS对接Flink工具dws-connector-flink的桶。

    开启Checkpoint

    勾选

    其他参数

    保持默认

  6. 将以下符合Flink要求的SQL代码复制到左侧的SQL代码窗。

    其中“Kafka实例内网IP地址和端口”参见2.a获取,“DWS内网域名”由11获取。

    以下是Flink SQL作业代码中常见的参数解释:

    • connector:用于指定表的数据源的连接器类型,Kafka数据源指定为kafka,DWS可指定为dws,更多取值可参见Connector列表
    • write.mode:入库方式,支持auto,copy_merge,copy_upsert,upsert,update,copy_update,copy_auto等。
    • autoFlushBatchSize:指定在自动刷新前要缓冲的记录数。当缓冲的记录数达到这个值时,Flink会将数据批量写入目标系统,例如5000行后再写入Sink目标端。在Flink SQL中,Sink(中文常译为"接收器"或"输出端")是指数据流处理管道中的最终输出目的地,负责将处理后的数据写入外部存储系统或发送给下游应用。
    • autoFlushMaxInterval:指定自动刷新的最大时间间隔。即使缓冲的记录数未达到autoFlushBatchSize,超过这个时间间隔后也会触发刷新。
    • key-by-before-sink:指定是否在数据写入Sink目标端前按主键分组。这可以确保相同主键的记录被连续写入,对于某些需要主键有序的系统(如某些数据库)很有用。该参数旨在解决多并发写入的场景下且write.mode为upsert时,如果多个子任务中写入Sink的一批数据具有不止一条相同的主键,并且主键相同的这些数据先后顺序不一致,就会导致两个子任务在向DWS根据主键获取行锁时发生互锁的问题。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    CREATE TABLE user_kafka (
      id string,
      name string,
      age int
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'topic-demo',
      'properties.bootstrap.servers' = 'Kafka实例内网IP地址和端口',
      'properties.group.id' = 'kafka01',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE user_dws (
      id string,
      name string,
      age int,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:postgresql://DWS内网域名:8000/gaussdb',
      'tableName' = 'public.user_dws',
      'username' = 'dbadmin',
      'password' = '数据库用户dbadmin密码'
      'writeMode' = 'auto',  
      'autoFlushBatchSize'='50000',  
      'autoFlushMaxInterval'='5s',  
      'key-by-before-sink'='true'
    );
    
    INSERT INTO user_dws select * from user_kafka;    --将处理结果写入Sink
    

  7. 单击“格式化”,再单击“保存”。

    请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。

    图15 作业的SQL语句

  8. 回到DLI控制台首页,左侧选择“作业管理 > Flink作业”。
  9. 单击作业名称kafka-dws右侧的“启动”,单击“立即启动”。

    等待约1分钟,再刷新页面,状态在“运行中”表示作业成功运行。

    图16 作业运行状态

步骤七:通过Kafka客户端生产和修改消息

  1. 参见ECS文档创建一台ECS,具体创建步骤此处不再赘述。创建时,确保ECS的区域、虚拟私有云保持与Kafka一致。
  2. 安装JDK。

    1. 登录ECS,进入到/usr/local,下载JDK包。
      1
      2
      cd /usr/local
      wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
      
    2. 解压下载好的JDK包。
      1
      tar -zxvf jdk-17_linux-x64_bin.tar.gz
      
    3. 执行以下命令进入/etc/profile文件。
      1
      vim /etc/profile
      
    4. 按i进入编辑模式,将以下内容增加到/etc/profile文件的末尾。
      1
      2
      3
      4
      5
      export JAVA_HOME=/usr/local/jdk-17.0.7  #jdk安装目录
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH 
      export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
      export PATH=$PATH:${JAVA_PATH}
      

    5. 按ESC,输入:wq!按回车,保存退出。
    6. 执行命令,使环境变量生效。
      1
      source /etc/profile
      
    7. 执行以下命令,提示如下信息表示jdk安装成功。
      1
      java -version
      

  3. 安装Kafka客户端。

    1. 进入/opt目录,执行以下命令获取Kafka客户端软件包。
      1
      2
      cd /opt
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
      
    2. 解压下载好的软件包。
      1
      tar -zxf kafka_2.12-2.7.2.tgz
      
    3. 进入到Kafka客户端目录。
      1
      cd /opt/kafka_2.12-2.7.2/bin
      

  4. 执行以下命令连接Kafka。其中 {连接地址}为Kafka的内网连接地址,参见2.a获取,topic为5创建的Kafka的topic名称。

    1
    ./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic名称}
    

    本实践示例如下:

    ./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo

    如上图出现>符号,无其他报错,表示连接成功。

  5. 在已连接kafka的客户端窗口下,根据场景描述规划的数据,复制以下内容(注意一次复制一行),按回车发送,进行生产消息。

    1
    2
    3
    {"id":"1","name":"lily","age":"16"}
    {"id":"2","name":"lucy","age":"17"}
    {"id":"3","name":"lilei","age":"15"}
    

  6. 回到DWS控制台,左侧选择“专属集群 > 集群列表”,单击DWS集群右侧“登录”,进入SQL页面。
  7. 执行以下SQL语句,确认数据实时入库成功。

    1
    SELECT * FROM user_dws ORDER BY id;
    

  1. 继续回到ECS中连接Kafka的客户端窗口,复制以下内容(注意一次复制一行),按回车发送,进行生产消息。

    1
    2
    {"id":"2","name":"jim","age":"17"}
    {"id":"3","name":"tom","age":"15"}
    

  2. 回到DWS已打开的SQL窗口,执行以下SQL语句,发现id为2和3的姓名已修改为jim和tom。

    符合场景描述预期,本实践结束。
    1
    SELECT * FROM user_dws ORDER BY id;