MySQL CDC源表
功能描述
MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。
前提条件
- MySQL CDC要求MySQL版本为5.7或8.0.x。
- 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
- MySQL已开启了Binlog,并且binlog_row_image设置为FULL。
- 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 同步数据库数据的客户端,都会有一个唯一ID,即Server ID。同一个数据库下,建议每个MySQL CDC作业配置不同的Server ID。
- MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。
- 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。
- 当使用MySQ CDM源表时,请不要在源表参数里手动关闭debezium.connect.keep.alive,确保debezium.connect.keep.alive=true(默认值为true)。
如果手动关闭了debezium.connect.keep.alive,一旦发生拉取Binlog线程与MySQL服务器的连接连接异常,拉取Binlog线程不会尝试自动重连,这可能导致无法正常从源端拉取binlog日志。
语法格式
create table mySqlCdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' );
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'mysql-cdc'。 |
hostname |
是 |
无 |
String |
MySQL数据库的IP地址或者Hostname。 |
username |
是 |
无 |
String |
MySQL数据库的用户名。 |
password |
是 |
无 |
String |
MySQL数据库的密码。 |
database-name |
是 |
无 |
String |
访问的数据库名称。 数据库名称支持正则表达式以读取多个数据库的数据,例如flink(.)*表示以flink开头的数据库名。 |
table-name |
是 |
无 |
String |
访问的表名。 表名支持正则表达式以读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的表名。 |
port |
否 |
3306 |
Integer |
MySQL数据库的端口号。 |
server-id |
否 |
5400~6000随机值 |
String |
数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。 默认会随机生成一个5400~6400的值。 |
scan.startup.mode |
否 |
initial |
String |
消费数据时的启动模式。
|
server-time-zone |
否 |
无 |
String |
数据库在使用的会话时区。 例如:Asia/Shanghai。 |
pwd_auth_name |
否 |
无 |
String |
DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置置账号和密码。 |
示例
该示例是利用MySQL-CDC实时读取RDS MySQL中的数据,并写入到Print结果表中,其具体步骤如下(本示例使用RDS MySQL数据库引擎版本为MySQL 5.7.32)。
- 参考增强型跨源连接,根据MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据MySQL的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 在MySQL中的flink数据库下创建相应的表,表名为cdc_order,SQL语句参考如下:
CREATE TABLE `flink`.`cdc_order` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE NULL, `real_pay` DOUBLE NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
- 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
create table mysqlCdcSource( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' ); create table printSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key(order_id) not enforced ) with ( 'connector' = 'print' ); insert into printSink select * from mysqlCdcSource;
- 在MySQL中执行以下命令插入测试数据。
insert into cdc_order values ('202103241000000001','webShop','2021-03-24 10:00:00','100.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'), ('202103241606060001','appShop','2021-03-24 16:06:06','200.00','180.00','2021-03-24 16:10:06','0001','Alice','330106'); delete from cdc_order where order_channel = 'webShop'; insert into cdc_order values('202103251202020001','miniAppShop','2021-03-25 12:02:02','60.00','60.00','2021-03-25 12:03:00','0002','Bob','330110');
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I(202103241000000001,webShop,2021-03-2410:00:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106) +I(202103241606060001,appShop,2021-03-2416:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106) -D(202103241000000001,webShop,2021-03-2410:00:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106) +I(202103251202020001,miniAppShop,2021-03-2512:02:02,60.0,60.0,2021-03-2512:03:00,0002,Bob,330110)
常见问题
Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合?
A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。
例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。
insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');