使用DLI Flink作业实时同步MySQL数据至(GaussDB)DWS集群
本实践演示如何使用华为云DLI服务的Flink作业,将MySQL数据实时同步到GaussDB(DWS)。
了解DLI请参见数据湖产品介绍。
本实践预计时长60分钟,实践用到的云服务包括虚拟私有云 VPC及子网、云数据库 RDS、数据湖探索 DLI、对象存储服务 OBS和数据仓库服务 GaussDB(DWS),基本流程如下:
步骤一:准备MySQL数据
- 购买RDS实例,参见表1配置关键参数,其他参数可保持默认,如需了解详情请参见RDS文档。
- 连接RDS实例,执行以下命令,创建名为mys_data数据库。
1
CREATE DATABASE mys_data;
- 切换到新的数据库mys_data,并执行以下命令,创建表mys_orders。
1 2 3 4 5 6 7 8
CREATE TABLE mys_data.mys_order ( order_id VARCHAR(12), order_channel VARCHAR(32), order_time DATETIME, cust_code VARCHAR(6), pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) );
- 执行以下命令向表中插入数据。
1 2
INSERT INTO mys_data.mys_order VALUES ('202306270001', 'webShop', TIMESTAMP('2023-06-27 10:00:00'), 'CUST1', 1000, 1000); INSERT INTO mys_data.mys_order VALUES ('202306270002', 'webShop', TIMESTAMP('2023-06-27 11:00:00'), 'CUST2', 5000, 5000);
- 执行以下命令,验证数据是否插入成功。
1
SELECT * FROM mys_data.mys_order;
步骤二:创建GaussDB(DWS)集群
- 创建集群,同时为确保网络连通,GaussDB(DWS)集群的区域、VPC选择与RDS实例保持一致,本实践为“中国-香港”,虚拟私有云与上面创建RDS的虚拟私有云保持一致。
- 在GaussDB(DWS)控制台的“专属集群 > 集群列表”页面,单击指定集群所在行操作列的“登录”按钮。登录信息如下:
- 集群:创建的GaussDB(DWS)集群
- 数据库:gaussdb
- 数据源名称:dws-demo
- 用户名:dbadmin
- 密码:创建GaussDB(DWS)集群设置的密码
- 勾选“记住密码”,单击“测试连接”,连接成功后,单击“确定”。
- 复制如下SQL语句,在SQL窗口中,单击“执行SQL”,创建名为dws_data的SCHEMA。
1
CREATE SCHEMA dws_data;
- 在新建的SCHEMA下,创建dws_order表。
1 2 3 4 5 6 7
CREATE TABLE dws_data.dws_order ( order_id VARCHAR(12), order_channel VARCHAR(32), order_time TIMESTAMP, cust_code VARCHAR(6), pay_amount DOUBLE PRECISION, real_pay DOUBLE PRECISION );
- 查询数据,当前为空表。
1
SELECT * FROM dws_data.dws_order;
步骤三:创建DLI队列
- 登录华为云控制台,服务列表选择“大数据 > 数据湖探索DLI”,进入DLI管理控制台。
- 左侧导航栏选择“资源管理 > 弹性资源池”,进入弹性资源池管理页面。
- 单击右上角“购买弹性资源池”,填写如下参数,其他参数项如表中未说明,默认即可。
表2 DLI弹性资源池 参数项
参数值
计费模式
按需计费
区域
中国-香港
名称
dli_dws
规格
基础版
网段
172.16.0.0/18,需选择与MySQL和GaussDB(DWS)不在同一个网段。例如,如果MySQL和GaussDB(DWS)在192.168.x.x网段,则DLI则选择172.16.x.x。
- 单击“立即购买”,单击“提交”。
等待资源池创建成功,继续执行下一步。
- 在弹性资源池页面,单击创建好的资源池所在行右侧的“添加队列”,填写如下参数,其他参数项如表中未说明,默认即可。
表3 添加队列 参数项
参数值
名称
dli_dws
类型
通用队列
- 单击“下一步”,单击“确定”。队列创建成功。
步骤四:创建增强型跨源连接
- 放通RDS的安全组,允许DLI队列所在的网段可以访问RDS。
- 左侧选择“资源管理 > 队列管理”,记录dli_dws所在网段。
图1 DLI队列网段
- 切换到RDS的控制台,左侧选择“实例管理”,单击创建好的RDS实例名称。
- 记录“连接信息”的“内网地址”,后续测试连通性的步骤需要使用。
- 单击“连接信息”中安全组旁边的“管理”。
图2 RDS安全组
- 在弹出的安全组列表中,单击安全组名称,进入安全组配置页面。
- 选择“入方向规则 > 添加规则”,如下图所示,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与步骤三:创建DLI队列的时候填入的网段保持一致。
图3 RDS安全组添加规则
- 单击“确定”。
- 左侧选择“资源管理 > 队列管理”,记录dli_dws所在网段。
- 回到DLI管理控制台,单击左侧的“跨源管理”,选择“增强型跨源”,单击“创建”。
- 填写如下参数,其他参数项如表中未说明,默认即可。
表4 DLI到RDS的连接 参数项
参数值
连接名称
dli_rds
弹性资源池
选择上面创建的DLI弹性资源池。
虚拟私有云
选择RDS所在的虚拟私有云。
子网
选择RDS所在的子网。
其他参数
保持默认。
图4 创建跨源连接
- 单击“确定”。等待RDS连接创建成功。
- 测试DLI到RDS的连通性。
- 左侧导航栏选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
- 地址栏内输入1.c记录的RDS的内网地址和3306端口。
- 单击“测试”,验证DLI连通RDS成功。
图5 测试RDS与DLI连通
- 测试DLI到GaussDB(DWS)的连通性。
- 进入到GaussDB(DWS)管理控制台,左侧导航栏单击“专属集群 > 集群列表”,单击集群名称进入GaussDB(DWS)集群详情。
- 如下图,记录下GaussDB(DWS)集群的内网IP(两个取一个即可)和端口,以备后面步骤需要。
图6 GaussDB(DWS)内网IP
- 单击安全组名称。
图7 GaussDB(DWS)安全组
-
选择“入方向规则 > 添加规则”,如下图,添加DLI队列的网段地址,本实践为172.16.0.0/18,实际请与4填入的网段保持一致。
图8 GaussDB(DWS)安全组添加规则
- 单击“确定”。
- 再切换到DLI控制台,左侧选择“资源管理 > 队列管理”,选择dli_dws所在行操作列的“更多 > 测试地址连通性”。
- 在地址栏中,输入获取的GaussDB(DWS)集群的内网IP和端口。
- 单击“测试”,验证DLI连通GaussDB(DWS)成功。
图9 测试GaussDB(DWS)连通
步骤五:创建DLI Flink作业
- 登录OBS管理控制台,创建OBS桶,用于保存Flink运行作业,参见OBS用户指南。
关键参数按如下填写,其他参数默认即可。
- 区域:中国-香港
- 桶名称:dli-obs01 (如提示冲突,可以依次递增到02、03)
- 桶策略:私有
- 回到DLI管理控制台,左侧选择“作业管理 > Flink作业”,单击右上角“创建作业”。
- 类型选择“Flink OpenSource SQL”,名称填写rds-dws。
图10 创建作业
- 单击“确定”。系统自动进入到作业的编辑页面。
- 在页面右侧填写如下关键参数,其他参数项如未说明,默认即可。
- 将以下符合Flink要求的SQL代码复制到左侧的SQL代码窗。
“RDS数据库的内网IP”参见1.c获取,“GaussDB(DWS)集群内网IP”参见6.b获取,并修改RDS数据库的root用户密码、GaussDB(DWS)的dbadmin用户密码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
CREATE TABLE mys_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'RDS数据库的内网IP', 'port' = '3306', 'username' = 'root', 'password' = 'RDS数据库的root用户密码', 'database-name' = 'mys_data', 'table-name' = 'mys_order' ); CREATE TABLE dws_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://GaussDB(DWS)集群内网IP:8000/gaussdb', 'table-name' = 'dws_data.dws_order', 'username' = 'dbadmin', 'password' = 'GaussDB(DWS)的dbadmin用户密码', 'write.mode' = 'insert' ); INSERT INTO dws_order SELECT * FROM mys_order;
- 单击“格式化”,再单击“保存”。
请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。
图11 Flink作业参数
- 回到DLI控制台首页,左侧选择“作业管理 > Flink作业”。
- 单击作业名称rds-dws右侧的“启动”,单击“立即启动”。
等待约1分钟,再刷新页面,状态在“运行中”表示作业成功运行。
图12 Flink运行成功
步骤六:验证数据同步
- 回到GaussDB(DWS)数据库的SQL窗口,如果连接超时,参见以下重新登录。
- 切换到GaussDB(DWS)管理控制台。
- 左侧导航选“专属集群 > 集群列表”,单击dws-demo所在行右侧的“登录”。
- 执行以下查询语句,发现MySQL的表的两行数据已经同步至GaussDB(DWS)。
1
SELECT * FROM dws_data.dws_order;
图13 查询结果
- 切换到RDS的MySQL的SQL界面,再执行以下语句,插入3条新的数据。
1 2 3
INSERT INTO mys_data.mys_order VALUES ('202403090003', 'webShop', TIMESTAMP('2024-03-09 13:00:00'), 'CUST1', 2000, 2000); INSERT INTO mys_data.mys_order VALUES ('202403090004', 'webShop', TIMESTAMP('2024-03-09 14:00:00'), 'CUST2', 3000, 3000); INSERT INTO mys_data.mys_order VALUES ('202403100004', 'webShop', TIMESTAMP('2024-03-10 10:00:00'), 'CUST3', 6000, 6000);
图14 MySQL新增数据
- 回到GaussDB(DWS)的SQL窗口再次执行以下SQL查询,从返回结果可以看到MySQL数据已实时同步至GaussDB(DWS)。
1
SELECT * FROM dws_data.dws_order;
图15 数据实时同步
更多信息
在Flink跨源开发场景下,如果在作业脚本中直接配置数据源认证信息,存在密码泄露的风险。建议使用DLI提供的跨源认证功能,不要在作业脚本中直接指定MySQL和GaussDB(DWS)的用户名和密码。
当前仅Flink 1.12版本支持,更高版本暂不支持,请留意官网文档变更。
- 登录DLI控制台,选择“跨源管理 > 跨源认证”。
- 单击“创建”。
- 创建MySQL的root用户密码认证。
- 填写如下参数。
- 类型选择“Password”。
- 认证信息名称,填写:mysql_pwd_auth。
- 用户名:root。
- 用户密码:root用户密码。
图16 MySQL密码认证
- 单击“确定”。
- 填写如下参数。
- 创建GaussDB(DWS)的dbadmin用户密码认证。
- 填写如下参数。
- 类型选择“Password”。
- 认证信息名称,填写:dws_pwd_auth。
- 用户名:dbadmin。
- 用户密码:dbadmin用户密码。
图17 GaussDB(DWS)密码认证
- 单击“确定”。
- 填写如下参数。
- 回到DLI管理控制台,选择“作业管理 > Flink作业”,在步骤五:创建DLI Flink作业创建的作业名称所在行右侧,选择“更多 > 停止”,停止作业。
- 作业停止后,单击作业名称所在行右侧的“编辑”。
- 将SQL脚本替换成以下最新。
文件中,仅需替换RDS内网IP地址、GaussDB(DWS)内网IP地址即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
CREATE TABLE mys_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'RDS内网IP地址', 'port' = '3306', 'pwd_auth_name' = 'mysql_pwd_auth', 'database-name' = 'mys_data', 'table-name' = 'mys_order' ); CREATE TABLE dws_order ( order_id STRING, order_channel STRING, order_time TIMESTAMP, cust_code STRING, pay_amount DOUBLE, real_pay DOUBLE, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://GaussDB(DWS)内网IP地址:8000/gaussdb', 'table-name' = 'dws_data.dws_order', 'pwd_auth_name' = 'dws_pwd_auth', 'write.mode' = 'insert' ); INSERT INTO dws_order SELECT * FROM mys_order;
- 替换文件后,单击“格式化”,并单击“保存”。
- 重新启动作业,再参见步骤六:验证数据同步,验证数据同步成功。