使用DLI Flink作业实时同步Kafka数据至(GaussDB)DWS集群
本实践演示通过数据湖探索服务 DLI Flink作业将分布式消息服务 Kafka的消费数据实时同步至GaussDB(DWS)数据仓库,实现Kafka实时入库到GaussDB(DWS)的过程。演示过程包括实时写入和更新已有数据的场景。
- 了解DLI请参见数据湖产品介绍。
- 了解Kafka请参见分布式消息服务Kafka产品介绍。
本实践预计时长90分钟,实践用到的云服务包括虚拟私有云 VPC及子网、弹性负载均衡 ELB、弹性云服务器 ECS、对象存储服务 OBS、分布式消息服务 Kafka、数据湖探索 DLI和数据仓库服务 GaussDB(DWS),基本流程如下:
- 准备工作
- 步骤一:创建Kafka实例
- 步骤二:创建绑定ELB的DWS集群和目标表
- 步骤三:创建DLI队列
- 步骤四:创建Kafka和DWS的增强型跨源连接
- 步骤五:准备DWS对接Flink工具dws-connector-flink
- 步骤六:创建并编辑DLI Flink作业
- 步骤七:通过Kafka客户端生产和修改消息
场景描述
假设数据源Kafka的样例数据是一个用户信息表,如表1所示, 包含 id,name,age三个字段。其中id是唯一且固定的字段,多个业务系统会共用,业务上一般不需要修改,仅修改姓名name,年龄age。
首先,通过Kafka生产以下三组数据,通过DLI Flink作业完成数据同步到数据仓库服务 GaussDB(DWS)。接着,需要修改id为2和3的用户为新的jim和tom,再通过DLI Flink作业完成数据的更新并同步到GaussDB(DWS)。
约束限制
- 确保VPC、ECS、OBS、Kafka、DLI和DWS服务在同一个区域内,例如华北-北京四。
- 确保Kafka、DLI、DWS网络互通。本实践将Kafka和DWS创建在同一个区域和虚拟私有云下,同时在Kafka和DWS的安全组中放通了DLI的队列所在网段,确保网络互通。
- 为确保DLI到DWS的连接链路稳定,请创建完DWS集群后为集群绑定ELB服务。
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用GaussDB(DWS) 前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
步骤一:创建Kafka实例
- 登录华为云控制台,服务列表选择“应用中间件 > 分布式消息服务Kafka版”,进入Kafka管理控制台。
- 左侧导航栏选择“Kafka专享版”,单击右上角的“购买kafka实例”。
- 填写如下参数,其他参数项如表中未说明,默认即可:
表2 kafka实例参数 参数项
参数值
计费模式
按需计费
区域
华北-北京四
项目
默认
可用区
可用区1(如遇售罄,选择其他可用区)
实例名称
kafka-dli-dws
企业项目
default
规格类型
默认
版本
2.7
CPU架构
x86计算
代理规格
kafka.2u4g.cluster.small(实例仅为参考,选择最小规格即可)
代理数量
3
虚拟私有云
选择已创建的虚拟私有云,如果没有,则需要创建。
安全组
选择已创建的安全组,如果没有,则需要创建。
其他参数
保持默认
图2 创建Kafka实例
- 单击“立即创建”。等待创建成功。
- 创建成功后,在Kafka实例列表中,单击创建好的Kafka实例名称,进入基本信息页面。
- 左侧选择“Topic管理”,单击“创建Topic”。
Topic名称设置为“topic-demo”,其他可保持默认。
图3 创建Topic
- 单击“确定”,在Topic列表中可以看到topic-demo已创建成功。
- 左侧导航栏选择“消费组管理”,单击“创建消费组”。
- 消费组名称输入“kafka01”,单击“确定”。
步骤二:创建绑定ELB的DWS集群和目标表
- 创建独享型弹性负载均衡服务ELB,网络类型选择IPv4私网即可,区域、VPC选择与Kafka实例保持一致,本实践为“华北-北京四”。
- 创建集群,为GaussDB(DWS)绑定弹性负载均衡 ELB,同时为确保网络连通,GaussDB(DWS)集群的区域、VPC选择与Kafka实例保持一致,本实践为“华北-北京四”,虚拟私有云与上面创建Kafka的虚拟私有云保持一致。
- 在GaussDB(DWS)控制台的“专属集群 > 集群列表”页面,单击指定集群所在行操作列的“登录”按钮。
本实践以8.1.3.x版本为例,8.1.2及以前版本不支持此登录方式,可以使用Data Studio连接集群。
- 登录成功后,进入SQL编辑器。
- 复制如下SQL语句,在SQL窗口中,单击“执行SQL”,创建目标表user_dws。
1 2 3 4 5 6
CREATE TABLE user_dws ( id int, name varchar(50), age int, PRIMARY KEY (id) );
步骤三:创建DLI队列
- 登录华为云控制台,服务列表选择“大数据 > 数据湖探索DLI”,进入DLI管理控制台。
- 左侧导航栏选择“资源管理 > 弹性资源池”,进入弹性资源池管理页面。
- 单击右上角“购买弹性资源池”,填写如下参数,其他参数项如表中未说明,默认即可。
表3 DLI队列 参数项
参数值
计费模式
按需计费
区域
华北-北京四
名称
dli_dws
规格
基础版
网段
172.16.0.0/18,需选择与Kafka和DWS不在同一个网段。例如,如果Kafka和DWS在192.168.x.x网段,则DLI则选择172.16.x.x。
- 单击“立即购买”,单击“提交”。
等待资源池创建成功,继续执行下一步。
- 在弹性资源池页面,单击创建好的资源池所在行右侧的“添加队列”,填写如下参数,其他参数项如表中未说明,默认即可。
表4 添加队列 参数项
参数值
名称
dli_dws
类型
通用队列
- 单击“下一步”,单击“确定”。队列创建成功。
步骤四:创建Kafka和DWS的增强型跨源连接
- 放通Kafka的安全组,允许DLI队列所在的网段可以访问Kafka。
- 回到Kafka控制台,单击Kafka实例名称进入基本信息。查看“连接信息”的“内网连接地址”,并记录下此地址,以备后续步骤使用。
图4 kafka内网连接地址
- 单击网络的安全组名称。
图5 kafka安全组
- 选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI队列的时候填入的网段保持一致。
图6 kafka安全组添加规则
- 单击“确定”。
- 回到Kafka控制台,单击Kafka实例名称进入基本信息。查看“连接信息”的“内网连接地址”,并记录下此地址,以备后续步骤使用。
- 回到DLI管理控制台,单击左侧的“跨源管理”,选择“增强型跨源”,单击“创建”。
- 填写如下参数,其他参数项如表中未说明,默认即可。
表5 DLI到Kafka的连接 参数项
参数值
连接名称
dli_kafka
弹性资源池
选择上面创建的DLI队列名称dli_dws。
虚拟私有云
选择Kafka所在的虚拟私有云。
子网
选择Kafka所在的子网。
其他参数
保持默认。
图7 创建连接
- 单击“确定”。等待Kafka连接创建成功。
- 左侧导航栏选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
- 在地址栏中,输入1.a获取的Kafka实例的内网IP和端口(Kafka的地址有三个,输入一个即可)。
图8 测试kafka连通性
- 单击“测试”,验证DLI连通Kafka成功。
- 进入到DWS管理控制台,左侧导航栏单击“专属集群 > 集群列表”,单击集群名称进入DWS集群详情。
- 如下图,记录下DWS集群的内网域名、端口和弹性负载均衡地址,以备后面步骤需要。
图9 内网域名和ELB地址
- 单击安全组名称。
图10 DWS安全组
- 选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI队列的时候填入的网段保持一致。
图11 DWS安全组添加规则
- 单击“确定”。
- 再切换到DLI控制台,左侧选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
- 在地址栏中,输入9获取的GaussDB(DWS)集群的弹性负载均衡IP和端口。
图12 测试GaussDB(DWS)连通
- 单击“测试”,验证DLI连通GaussDB(DWS)成功。
步骤五:准备DWS对接Flink工具dws-connector-flink
dws-connector-flink是一款基于DWS JDBC接口实现对接Flink的一个工具。在配置DLI作业阶段,将该工具及依赖放入Flink类加载目录,提升Flink作业入库DWS的能力。
- 浏览器访问https://mvnrepository.com/artifact/com.huaweicloud.dws。
- 在软件列表中选择最新版本的DWS Connectors Flink,本实践选择DWS Connector Flink 2 12 1 12。
- 单击“1.0.4”分支,实际请以官网发布的新分支为准。
- 单击“View ALL”。
- 单击dws-connector-flink_2.12_1.12-1.0.4-jar-with-dependencies.jar,下载到本地。
- 创建OBS桶,本实践桶名设置为obs-flink-dws,并将此文件上传到OBS桶下,注意桶也保持与DLI在一个区域下,本实践为“ 华北-北京四”。
图13 上传jar包到OBS桶
步骤六:创建并编辑DLI Flink作业
- 回到DLI管理控制台,左侧选择“作业管理 > Flink作业”,单击右上角“创建作业”。
- 类型选择“Flink OpenSource SQL”,名称填写kafka-dws。
图14 创建作业
- 单击“确定”。系统自动进入到作业的编辑页面。
- 在页面右侧填写如下参数,其他参数项如表中未说明,默认即可。
表6 flink作业参数 参数项
参数值
所属队列
dli_dws
Flink版本
1.12
UDF Jar
选择步骤五:准备DWS对接Flink工具dws-connector-flink的OBS桶中的jar文件。
OBS桶
开启Checkpoint
勾选
其他参数
保持默认
图15 编辑作业
- 将以下符合Flink要求的SQL代码复制到左侧的SQL代码窗。
其中“Kafka实例内网IP地址和端口”参见1.a获取,“DWS内网域名”由9获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
CREATE TABLE user_kafka ( id string, name string, age int ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-demo', 'properties.bootstrap.servers' = 'Kafka实例内网IP地址和端口', 'properties.group.id' = 'kafka01', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE user_dws ( id string, name string, age int, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:postgresql://DWS内网域名:8000/gaussdb', 'tableName' = 'public.user_dws', 'username' = 'dbadmin', 'password' = '数据库用户dbdamin密码' ); INSERT INTO user_dws select * from user_kafka;
- 单击“语义校验”,等待校验成功。
如校验失败,则检查SQL的输入是否存在语法错误。
图16 作业的SQL语句
- 单击“保存”。
- 回到DLI控制台首页,左侧选择“作业管理 > Flink作业”。
- 单击作业名称kafka-dws右侧的“启动”,单击“立即启动”。
等待约1分钟,再刷新页面,状态在“运行中”表示作业成功运行。
图17 作业运行状态
步骤七:通过Kafka客户端生产和修改消息
- 参见ECS文档创建一台ECS,具体创建步骤此处不再赘述。创建时,确保ECS的区域、虚拟私有云保持与Kafka一致。
- 安装JDK。
- 登录ECS,进入到/usr/local,下载JDK包。
1 2
cd /usr/local wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
- 解压下载好的JDK包。
1
tar -zxvf jdk-17_linux-x64_bin.tar.gz
- 执行以下命令进入/etc/profile文件。
1
vim /etc/profile
- 按i进入编辑模式,将以下内容增加到/etc/profile文件的末尾。
1 2 3 4 5
export JAVA_HOME=/usr/local/jdk-17.0.7 #jdk安装目录 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${JAVA_HOME}/test:${JAVA_HOME}/lib/gsjdbc4.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH}
- 按ESC,输入:wq!按回车,保存退出。
- 执行命令,使环境变量生效。
1
source /etc/profile
- 执行以下命令,提示如下信息表示jdk安装成功。
1
java -version
- 登录ECS,进入到/usr/local,下载JDK包。
- 安装Kafka客户端。
- 进入/opt目录,执行以下命令获取Kafka客户端软件包。
1 2
cd /opt wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
- 解压下载好的软件包。
1
tar -zxf kafka_2.12-2.7.2.tgz
- 进入到Kafka客户端目录。
1
cd /opt/kafka_2.12-2.7.2/bin
- 进入/opt目录,执行以下命令获取Kafka客户端软件包。
- 执行以下命令连接Kafka。其中 {连接地址}为Kafka的内网连接地址,参见1.a获取,topic为6创建的Kafka的topic名称。
1
./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic名称}
本实践示例如下:
./kafka-console-producer.sh --broker-list 192.168.0.136:9092,192.168.0.214:9092,192.168.0.217:9092 --topic topic-demo
如上图出现>符号,无其他报错,表示连接成功。
- 在已连接kafka的客户端窗口下,根据场景描述规划的数据,复制以下内容(注意一次复制一行),按回车发送,进行生产消息。
1 2 3
{"id":"1","name":"lily","age":"16"} {"id":"2","name":"lucy","age":"17"} {"id":"3","name":"lilei","age":"15"}
- 回到DWS控制台,左侧选择“专属集群 > 集群列表”,单击DWS集群右侧“登录”,进入SQL页面。
- 执行以下SQL语句,确认数据实时入库成功。
1
SELECT * FROM user_dws ORDER BY id;
- 继续回到ECS中连接Kafka的客户端窗口,复制以下内容(注意一次复制一行),按回车发送,进行生产消息。
1 2
{"id":"2","name":"jim","age":"17"} {"id":"3","name":"tom","age":"15"}
- 回到DWS已打开的SQL窗口,执行以下SQL语句,发现id为2和3的姓名已修改为jim和tom。
符合场景描述预期,本实践结束。
1
SELECT * FROM user_dws ORDER BY id;