更新时间:2024-11-08 GMT+08:00

MySQL CDC源表

功能描述

MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。

前提条件

  • MySQL CDC要求MySQL版本为5.7或8.0.x。
  • 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。

    跨源认证简介及操作方法请参考跨源认证简介

  • MySQL已开启了Binlog,并且binlog_row_image设置为FULL。
  • 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 同步数据库数据的客户端,都会有一个唯一ID,即Server ID。同一个数据库下,建议每个MySQL CDC作业配置不同的Server ID。
    主要原因如下:
    • MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量相同的Server ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。
    • 此外,多个作业共享相同的Server ID,会导致Binlog位点错乱,多读或少读数据,因此建议每个CDC作业都配置不同的Server ID。
  • MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。
  • 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。
  • 当使用MySQ CDM源表时,请不要在源表参数里手动关闭debezium.connect.keep.alive,确保debezium.connect.keep.alive=true(默认值为true)。

    如果手动关闭了debezium.connect.keep.alive,一旦发生拉取Binlog线程与MySQL服务器的连接连接异常,拉取Binlog线程不会尝试自动重连,这可能导致无法正常从源端拉取binlog日志。

语法格式

create table mySqlCdcSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'mysql-cdc',   
  'hostname' = 'mysqlHostname',
  'username' = 'mysqlUsername',
  'password' = 'mysqlPassword',
  'database-name' = 'mysqlDatabaseName',
  'table-name' = 'mysqlTableName'
);

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

说明

connector

String

connector类型,需配置为'mysql-cdc'。

hostname

String

MySQL数据库的IP地址或者Hostname。

username

String

MySQL数据库的用户名。

password

String

MySQL数据库的密码。

database-name

String

访问的数据库名称。

数据库名称支持正则表达式以读取多个数据库的数据,例如flink(.)*表示以flink开头的数据库名。

table-name

String

访问的表名。

表名支持正则表达式以读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的表名。

port

3306

Integer

MySQL数据库的端口号。

server-id

5400~6000随机值

String

数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。

默认会随机生成一个5400~6400的值。

scan.startup.mode

initial

String

消费数据时的启动模式。

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。

server-time-zone

String

数据库在使用的会话时区。

例如:Asia/Shanghai。

pwd_auth_name

String

DLI侧创建的Password类型的跨源认证名称。

使用跨源认证则无需在作业中配置置账号和密码。

示例

该示例是利用MySQL-CDC实时读取RDS MySQL中的数据,并写入到Print结果表中,其具体步骤如下(本示例使用RDS MySQL数据库引擎版本为MySQL 5.7.32)。

  1. 参考增强型跨源连接,根据MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据MySQL的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 在MySQL中的flink数据库下创建相应的表,表名为cdc_order,SQL语句参考如下:
    CREATE TABLE `flink`.`cdc_order` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE  NULL,
    	`real_pay` DOUBLE  NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;
  4. 创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    create table mysqlCdcSource(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'mysqlHostname',
      'username' = 'mysqlUsername',
      'password' = 'mysqlPassword',
      'database-name' = 'mysqlDatabaseName',
      'table-name' = 'mysqlTableName'
    );
    
    create table printSink(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id STRING,
      primary key(order_id) not enforced
    ) with (
      'connector' = 'print'
    );
    
    insert into printSink select * from mysqlCdcSource;
  5. 在MySQL中执行以下命令插入测试数据。
    insert into cdc_order values
    ('202103241000000001','webShop','2021-03-24 10:00:00','100.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'),
    ('202103241606060001','appShop','2021-03-24 16:06:06','200.00','180.00','2021-03-24 16:10:06','0001','Alice','330106');
    
    delete from cdc_order  where order_channel = 'webShop';
    
    insert into cdc_order values('202103251202020001','miniAppShop','2021-03-25 12:02:02','60.00','60.00','2021-03-25 12:03:00','0002','Bob','330110');
  6. 按照如下方式查看taskmanager.out文件中的数据结果:
    1. 登录DLI管理控制台,选择“作业管理 > Flink作业”。
    2. 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
    3. 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。

    数据结果参考如下:

    +I(202103241000000001,webShop,2021-03-2410:00:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106)
    +I(202103241606060001,appShop,2021-03-2416:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106)
    -D(202103241000000001,webShop,2021-03-2410:00:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106)
    +I(202103251202020001,miniAppShop,2021-03-2512:02:02,60.0,60.0,2021-03-2512:03:00,0002,Bob,330110)

常见问题

Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合?

A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。

例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。

insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');