MySql CDC
功能描述
MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。
类别 |
详情 |
---|---|
支持表类型 |
源表 |
前提条件
- MySQL CDC要求MySQL版本为5.6,5.7或8.0.x。
- with参数中字段只能使用单引号,不能使用双引号。
- 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- MySQL已开启了Binlog,并且binlog_row_image设置为FULL。
- 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 为每个Reader设置不同的Server ID
- 每个用于读取Binlog的MySQL客户端都应该有一个唯一的Server ID,确保MySQL服务器能够区分不同的客户端并维护各自的Binlog读取位置。
- 如果不同的作业共享相同的Server ID,可能会导致从错误的Binlog位置读取数据,从而引发数据不一致的问题。
- 可以通过SQL Hints为每个Source Reader分配唯一的Server ID,例如使用SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; 为4个Source Readers分配唯一的 Server ID。
- 设置MySQL会话超时时间
当为大型数据库创建初始一致快照时,您建立的连接可能会在读取表时碰到超时问题。您可以通过在MySQL侧配置 interactive_timeout 和 wait_timeout来解决此类问题。
- interactive_timeout: 服务器在关闭交互连接之前等待活动的秒数。 更多信息请参考 MySQL Documentations.
- wait_timeout: 服务器在关闭非交互连接之前等待活动的秒数。 更多信息请参考 MySQL Documentations.
- 使用无主键表时的注意事项:
- 使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。
- 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请使用索引中的列来加快 select 度。
- MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。
- 如果连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。
支持特性
- 增量快照读取
增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括:
- 在快照读取期间,Source 支持并发读取,
- 在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint,
- 在快照读取之前,Source 不需要数据库锁权限。
如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此server id的范围必须类似于 5400-6400, 且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。
- 无锁算法
- 并发读取
- 全量阶段支持checkpoint
语法格式
create table mySqlCdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' );
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'mysql-cdc'。 |
hostname |
是 |
无 |
String |
MySQL 数据库服务器的 IP 地址或主机名。 |
username |
是 |
无 |
String |
连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 |
password |
是 |
无 |
String |
连接 MySQL 数据库服务器时使用的密码。 |
database-name |
是 |
无 |
String |
要监视的 MySQL 服务器的数据库名称。 数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。
|
table-name |
是 |
无 |
String |
需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
说明:
MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。
|
port |
否 |
3306 |
Integer |
MySQL 数据库服务器的整数端口号。 |
server-id |
否 |
无 |
String |
读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408'。 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。 在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 |
scan.incremental.snapshot.enabled |
否 |
true |
Boolean |
增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: |
scan.incremental.snapshot.chunk.size |
否 |
8096 |
Integer |
表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 |
scan.snapshot.fetch.size |
否 |
1024 |
Integer |
读取表快照时每次读取数据的最大条数。 |
scan.startup.mode |
否 |
initial |
String |
MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。
|
scan.startup.specific-offset.file |
否 |
无 |
String |
在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 |
scan.startup.specific-offset.pos |
否 |
无 |
Long |
在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 |
scan.startup.specific-offset.gtid-set |
否 |
无 |
String |
在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 |
scan.startup.specific-offset.skip-events |
否 |
无 |
Long |
在指定的启动位点后需要跳过的事件数量。 |
scan.startup.specific-offset.skip-rows |
否 |
无 |
Long |
在指定的启动位点后需要跳过的数据行数量。 |
server-time-zone |
否 |
无 |
String |
数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型如何转换为字符串。 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。 |
debezium.min.row. count.to.stream.result |
否 |
1000 |
Integer |
在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 此参数确定 MySQL 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。 将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。 |
connect.timeout |
否 |
30s |
Duration |
连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 |
connect.max-retries |
否 |
3 |
Integer |
连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 |
connection.pool.size |
否 |
20 |
Integer |
连接池大小。 |
jdbc.properties.* |
否 |
无 |
String |
传递自定义 JDBC URL 属性的选项。 用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval |
否 |
30s |
Duration |
用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 |
debezium.* |
否 |
无 |
String |
将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性。 |
scan.incremental.close-idle-reader.enabled |
否 |
false |
Boolean |
是否在快照结束后关闭空闲的 Reader。 此特性需要'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 |
元数据
元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Key |
数据类型 |
说明 |
---|---|---|
table_name |
STRING NOT NULL |
当前记录所属的表名称。 |
database_name |
STRING NOT NULL |
当前记录所属的库名称。 |
op_ts |
TIMESTAMP_LTZ(3) NOT NULL |
当前记录表在数据库中更新的时间。 如果从表的快照而不是 binlog 读取记录,该值将始终为0。 |
数据类型映射
MySQL类型 |
Flink SQL类型 |
备注 |
---|---|---|
TINYINT |
TINYINT |
- |
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL |
SMALLINT |
- |
INT MEDIUMINT SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL |
INT |
- |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL |
BIGINT |
- |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL |
DECIMAL(20, 0) |
- |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL |
FLOAT |
- |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL |
DOUBLE |
- |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNE DDECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 |
DECIMAL(p, s) |
- |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 |
STRING |
在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。 如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。 |
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN |
- |
DATE |
DATE |
- |
TIME [(p)] |
TIME [(p)] |
- |
TIMESTAMP [(p)] DATETIME [(p)] |
TIMESTAMP [(p)] |
- |
CHAR(n) |
CHAR(n) |
- |
VARCHAR(n) |
VARCHAR(n) |
- |
BIT(n) |
BINARY(⌈n/8⌉) |
- |
BINARY(n) |
BINARY(n) |
- |
VARBINARY(N) |
VARBINARY(N) |
- |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING |
- |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES |
目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
YEAR |
INT |
- |
ENUM |
STRING |
- |
JSON |
STRING |
JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
SET |
ARRAY<STRING> |
因为MySQL中的SET数据类型是一个字符串对象,可以有零个或多个值 它应该始终映射到字符串数组。 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION |
STRING |
MySQL中的空间数据类型将转换为具有固定Json格式的字符串。 |
示例
该示例是利用MySQL-CDC实时读取RDS MySQL中的数据及其元数据,并写入到Print结果表中。
本示例使用RDS MySQL数据库引擎版本为MySQL 5.7.33。
- 参考增强型跨源连接,根据MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据MySQL的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 在MySQL中创建用户test,并授权,SQL语句参考如下:
CREATE USER 'test'@'%' IDENTIFIED BY 'xxx'; GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test'; FLUSH PRIVILEGES;
- 在MySQL中的flink数据库下创建相应的表,表名为cdc_order,SQL语句参考如下(该语句需要用户拥有CREATE权限):
CREATE TABLE `flink`.`cdc_order` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE NULL, `real_pay` DOUBLE NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
create table mysqlCdcSource( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key(order_id) not enforced ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' ); create table printSink( database_name string, table_name string, operation_ts TIMESTAMP_LTZ(3), order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id STRING, primary key(order_id) not enforced ) with ( 'connector' = 'print' ); insert into printSink select * from mysqlCdcSource;
- 在MySQL中执行以下命令插入测试数据(该语句需要用户有相应的权限)。
insert into flink.cdc_order values ('202103241000000001','webShop','2021-03-24 10:00:00','100.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'), ('202103241606060001','appShop','2021-03-24 16:06:06','200.00','180.00','2021-03-24 16:10:06','0001','Alice','330106'); delete from flink.cdc_order where order_channel = 'webShop'; insert into flink.cdc_order values('202103251202020001','miniAppShop','2021-03-25 12:02:02','60.00','60.00','2021-03-25 12:03:00','0002','Bob','330110');
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I[flink, cdc_order, 2023-11-10T07:41:12Z, 202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106] +I[flink, cdc_order, 2023-11-10T07:41:12Z, 202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106] -D[flink, cdc_order, 2023-11-10T07:41:59Z, 202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106] +I[flink, cdc_order, 2023-11-10T07:42:00Z, 202103251202020001, miniAppShop, 2021-03-25 12:02:02, 60.0, 60.0, 2021-03-25 12:03:00, 0002, Bob, 330110]
常见问题
Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合?
A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。
例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。
insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');