更新时间:2024-12-11 GMT+08:00
分享

MySql CDC

功能描述

MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。

表1 支持类别

类别

详情

支持表类型

源表

前提条件

  • MySQL CDC要求MySQL版本为5.6,5.7或8.0.x。
  • with参数中字段只能使用单引号,不能使用双引号。
  • 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • MySQL已开启了Binlog,并且binlog_row_image设置为FULL。
  • 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 为每个Reader设置不同的Server ID
    • 每个用于读取Binlog的MySQL客户端都应该有一个唯一的Server ID,确保MySQL服务器能够区分不同的客户端并维护各自的Binlog读取位置。
    • 如果不同的作业共享相同的Server ID,可能会导致从错误的Binlog位置读取数据,从而引发数据不一致的问题。
    • 可以通过SQL Hints为每个Source Reader分配唯一的Server ID,例如使用SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; 为4个Source Readers分配唯一的 Server ID。
  • 设置MySQL会话超时时间
    当为大型数据库创建初始一致快照时,您建立的连接可能会在读取表时碰到超时问题。您可以通过在MySQL侧配置 interactive_timeout 和 wait_timeout来解决此类问题。
    • interactive_timeout: 服务器在关闭交互连接之前等待活动的秒数。 更多信息请参考 MySQL Documentations.
    • wait_timeout: 服务器在关闭非交互连接之前等待活动的秒数。 更多信息请参考 MySQL Documentations.
  • 使用无主键表时的注意事项:
    • 使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。
    • 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请使用索引中的列来加快 select 度。
      无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:
      • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
      • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。
  • MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。
  • 如果连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。

支持特性

  • 增量快照读取

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括:

    • 在快照读取期间,Source 支持并发读取,
    • 在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint,
    • 在快照读取之前,Source 不需要数据库锁权限。

    如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此server id的范围必须类似于 5400-6400, 且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。

  • 无锁算法

    MySQL CDC source 使用 增量快照算法, 避免了数据库锁的使用,因此不需要 “RELOAD” 权限。

  • 并发读取

    增量快照读取提供了并行读取快照数据的能力。

  • 全量阶段支持checkpoint

    增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

语法格式

create table mySqlCdcSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'mysql-cdc',   
  'hostname' = 'mysqlHostname',
  'username' = 'mysqlUsername',
  'password' = 'mysqlPassword',
  'database-name' = 'mysqlDatabaseName',
  'table-name' = 'mysqlTableName'
);

参数说明

表2 源表参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'mysql-cdc'。

hostname

String

MySQL 数据库服务器的 IP 地址或主机名。

username

String

连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。

password

String

连接 MySQL 数据库服务器时使用的密码。

database-name

String

要监视的 MySQL 服务器的数据库名称。

数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。

  • 前缀匹配:^(test).* 匹配前缀为test的数据库名,例如test1、test2等。
  • 后缀匹配:.*[p$] 匹配后缀为p的数据库名,例如cdcp、edcp等。
  • 特定匹配:txc 匹配具体的数据库名。

table-name

String

需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。

说明:

MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。

  • 前缀匹配:^(test).* 匹配前缀为test的表名,例如test1、test2等。
  • 后缀匹配:.*[p$] 匹配后缀为p的表名,例如cdcp、edcp等。
  • 特定匹配:txc 匹配具体的表名。

port

3306

Integer

MySQL 数据库服务器的整数端口号。

server-id

String

读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408'。

建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。

在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。

scan.incremental.snapshot.enabled

true

Boolean

增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括:

  • 在快照读取期间,Source 支持并发读取
  • 在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint
  • 在快照读取之前,Source 不需要数据库锁权限。

    如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。

scan.incremental.snapshot.chunk.size

8096

Integer

表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。

scan.snapshot.fetch.size

1024

Integer

读取表快照时每次读取数据的最大条数。

scan.startup.mode

initial

String

MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取。
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

scan.startup.specific-offset.file

String

在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。

scan.startup.specific-offset.pos

Long

在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。

scan.startup.specific-offset.gtid-set

String

在 "specific-offset" 启动模式下,启动位点的 GTID 集合。

scan.startup.specific-offset.skip-events

Long

在指定的启动位点后需要跳过的事件数量。

scan.startup.specific-offset.skip-rows

Long

在指定的启动位点后需要跳过的数据行数量。

server-time-zone

String

数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型如何转换为字符串。

如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。

debezium.min.row. count.to.stream.result

1000

Integer

在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。

此参数确定 MySQL 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。

将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。

connect.timeout

30s

Duration

连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。

connect.max-retries

3

Integer

连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。

connection.pool.size

20

Integer

连接池大小。

jdbc.properties.*

String

传递自定义 JDBC URL 属性的选项。

用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.

heartbeat.interval

30s

Duration

用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。

debezium.*

String

将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。

例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的 Reader。 此特性需要'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。

元数据

元数据可以在 DDL 中作为只读(虚拟)meta 列声明。

表3 元数据

Key

数据类型

说明

table_name

STRING NOT NULL

当前记录所属的表名称。

database_name

STRING NOT NULL

当前记录所属的库名称。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

当前记录表在数据库中更新的时间。

如果从表的快照而不是 binlog 读取记录,该值将始终为0。

数据类型映射

表4 数据类型映射

MySQL类型

Flink SQL类型

备注

TINYINT

TINYINT

-

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

SMALLINT

-

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

INT

-

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT

-

BIGINT UNSIGNED

BIGINT UNSIGNED ZEROFILL

SERIAL

DECIMAL(20, 0)

-

FLOAT

FLOAT UNSIGNED

FLOAT UNSIGNED ZEROFILL

FLOAT

-

REAL

REAL UNSIGNED

REAL UNSIGNED ZEROFILL

DOUBLE

DOUBLE UNSIGNED

DOUBLE UNSIGNED ZEROFILL

DOUBLE PRECISION

DOUBLE PRECISION UNSIGNED

DOUBLE PRECISION UNSIGNED ZEROFILL

DOUBLE

-

NUMERIC(p, s)

NUMERIC(p, s) UNSIGNED

NUMERIC(p, s) UNSIGNED ZEROFILL

DECIMAL(p, s)

DECIMAL(p, s) UNSIGNE

DDECIMAL(p, s) UNSIGNED ZEROFILL

FIXED(p, s)

FIXED(p, s) UNSIGNED

FIXED(p, s) UNSIGNED ZEROFILL

where p <= 38

DECIMAL(p, s)

-

NUMERIC(p, s)

NUMERIC(p, s) UNSIGNED

NUMERIC(p, s) UNSIGNED ZEROFILL

DECIMAL(p, s)

DECIMAL(p, s) UNSIGNED

DECIMAL(p, s) UNSIGNED ZEROFILL

FIXED(p, s)

FIXED(p, s) UNSIGNED

FIXED(p, s) UNSIGNED ZEROFILL

where 38 < p <= 65

STRING

在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。

如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。

BOOLEAN

TINYINT(1)

BIT(1)

BOOLEAN

-

DATE

DATE

-

TIME [(p)]

TIME [(p)]

-

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

-

CHAR(n)

CHAR(n)

-

VARCHAR(n)

VARCHAR(n)

-

BIT(n)

BINARY(⌈n/8⌉)

-

BINARY(n)

BINARY(n)

-

VARBINARY(N)

VARBINARY(N)

-

TINYTEXT

TEXT

MEDIUMTEXT

LONGTEXT

STRING

-

TINYBLOB

BLOB

MEDIUMBLOB

LONGBLOB

BYTES

目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。

YEAR

INT

-

ENUM

STRING

-

JSON

STRING

JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。

SET

ARRAY<STRING>

因为MySQL中的SET数据类型是一个字符串对象,可以有零个或多个值 它应该始终映射到字符串数组。

GEOMETRY

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

STRING

MySQL中的空间数据类型将转换为具有固定Json格式的字符串。

示例

该示例是利用MySQL-CDC实时读取RDS MySQL中的数据及其元数据,并写入到Print结果表中。

本示例使用RDS MySQL数据库引擎版本为MySQL 5.7.33。

  1. 参考增强型跨源连接,根据MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据MySQL的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 在MySQL中创建用户test,并授权,SQL语句参考如下:
    CREATE USER 'test'@'%' IDENTIFIED BY 'xxx';
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test';
    FLUSH PRIVILEGES;
  4. 在MySQL中的flink数据库下创建相应的表,表名为cdc_order,SQL语句参考如下(该语句需要用户拥有CREATE权限):
    CREATE TABLE `flink`.`cdc_order` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE  NULL,
    	`real_pay` DOUBLE  NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;
  5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    create table mysqlCdcSource(
      database_name STRING METADATA VIRTUAL,
      table_name STRING METADATA VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING,
      primary key(order_id) not enforced
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'mysqlHostname',
      'username' = 'mysqlUsername',
      'password' = 'mysqlPassword',
      'database-name' = 'mysqlDatabaseName',
      'table-name' = 'mysqlTableName'
    );
    
    create table printSink(
      database_name string,
      table_name string,
      operation_ts TIMESTAMP_LTZ(3),
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING,
      primary key(order_id) not enforced
    ) with (
      'connector' = 'print'
    );
    insert into printSink select * from mysqlCdcSource;
  6. 在MySQL中执行以下命令插入测试数据(该语句需要用户有相应的权限)。
    insert into flink.cdc_order values
    ('202103241000000001','webShop','2021-03-24 10:00:00','100.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'),
    ('202103241606060001','appShop','2021-03-24 16:06:06','200.00','180.00','2021-03-24 16:10:06','0001','Alice','330106');
    
    delete from flink.cdc_order  where order_channel = 'webShop';
    insert into flink.cdc_order values('202103251202020001','miniAppShop','2021-03-25 12:02:02','60.00','60.00','2021-03-25 12:03:00','0002','Bob','330110');
  7. 按照如下方式查看taskmanager.out文件中的数据结果:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

    数据结果参考如下:

     +I[flink, cdc_order, 2023-11-10T07:41:12Z, 202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106]
    +I[flink, cdc_order, 2023-11-10T07:41:12Z, 202103241606060001, appShop, 2021-03-24 16:06:06, 200.0, 180.0, 2021-03-24 16:10:06, 0001, Alice, 330106]
    -D[flink, cdc_order, 2023-11-10T07:41:59Z, 202103241000000001, webShop, 2021-03-24 10:00:00, 100.0, 100.0, 2021-03-24 10:02:03, 0001, Alice, 330106]
    +I[flink, cdc_order, 2023-11-10T07:42:00Z, 202103251202020001, miniAppShop, 2021-03-25 12:02:02, 60.0, 60.0, 2021-03-25 12:03:00, 0002, Bob, 330110]

常见问题

Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合?

A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。

例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。

insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');

相关文档