更新时间:2024-12-03 GMT+08:00
分享

MySQL同步到DMS Kafka

支持的源端和目的端数据库版本

表1 支持的数据库版本

源端数据库

目的端数据库

MySQL数据库(5.6、5.7、8.x版本)

DMS Kafka集群(2.7、3.x版本)

数据库账号权限要求

在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。

表2 数据库账号权限

类型名称

权限要求

源数据库连接账号

需要具备如下最小权限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT,即执行SQL:

GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '用户名'@'%';

目标数据库连接账号

MS Kafka开启密文接入场景下,所配置用户需要有发布和订阅Topic的权限,其余场景无特殊权限要求。

  • 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
  • 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。

支持的同步对象范围

在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。

表3 同步对象范围

类型名称

使用须知

同步对象范围

  • 支持同步DML和DDL。
  • 仅支持同步MyISAM和InnoDB表。
  • 不支持同步视图、外键、存储过程、触发器、函数、事件、虚拟列、唯一约束和唯一索引。

注意事项

除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。

表4 注意事项

类型名称

使用和操作限制

数据库限制

源端数据库中的库名、表名、字段名不能包含:.<'>/\"以及非ASCII字符,建议尽量使用常规字符避免任务失败。

使用限制

通用:

  • 实时同步过程中,不支持IP、端口、账号、密码修改。
  • 不允许源数据库进行恢复操作。
  • 建议MySQL Binlog保留3天以上,不支持强制清理Binlog。

    异常/暂停恢复作业时,记录的Binlog位点过期会导致作业恢复失败,需要关注作业异常/暂停时长及Binlog保留时长。

  • 实时同步过程中,不允许源数据库MySQL跨大版本升级,否则可能导致数据不一致或者同步任务失败(跨版本升级后数据、表结构、关键字等信息均可能会产生兼容性改变),建议在该场景下重建同步任务。

全量同步阶段:

任务启动和全量数据同步阶段,请不要在源数据库执行DDL操作,否则可能导致任务异常。

增量同步阶段:

增量同步过程中,分库分表场景下,在多个分表执行的DDL,会同步多条数据到Kafka的Topic中。

常见故障排查:

在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。

其他限制

重命名表仅支持rename后库表在同步范围中的DDL操作(例如:RENAME TABLE A TO B,B需要在同步范围内)。

操作步骤

本小节以RDS for MySQL到DMS Kafka实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。

  1. 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
  2. 选择数据连接类型:源端选MySQL,目的端选DMS Kafka。

    图1 选择数据连接类型

  3. 选择集成作业类型:同步类型默认为实时,同步场景包含整库和分库分表场景。

    图2 选择集成作业类型

    同步场景相关介绍请参见同步场景

  4. 配置网络资源:选择已创建的MySQL、DMS Kafka数据连接和已配置好网络连接的资源组。

    图3 选择数据连接及资源组

  5. 检测网络连通性:数据连接和资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和资源组之间的连通性测试。

    • 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
    • 单击源端和目的端数据源和资源组中的“测试”按钮进行检测。

      网络连通性检测异常可先参考数据源和资源组网络不通如何排查?章节进行排查。

  6. 配置源端参数。

    各同步场景下选择需要同步库表的方式请参考下表。

    表5 选择需要同步的库表

    同步场景

    配置方式

    整库

    选择需要迁移的MySQL库表。
    图4 选择库表

    库与表均支持自定义选择,即可选择一库一表,也可选择多库多表。

    分库分表

    添加逻辑表。
    • 逻辑表名:即最终写入到DMS Kafka的Topic名。
    • 源库过滤条件:支持填入正则表达式,在所有MySQL实例中通过该正则表达式过滤出要抽取数据写入目标端Kafka Topic的所有分库。
    • 源表过滤条件:支持填入正则表达式,在过滤出的源端分库中再次过滤出要抽取数据写入目标端Kafka Topic的所有分表。
    图5 添加逻辑表

    已添加的逻辑表支持预览表结构及来源库表,单击“操作”列的预览即可。预览逻辑表时,源表数量越多,等待时间可能越长,请耐心等待。

    图6 逻辑表预览

  7. 配置目的端参数。

    图7 Kafka目的端配置项
    • 目标Topic名称规则。

      配置源端MySQL库表与目的端Kafka Topic的映射规则

      表6 目标Topic名称规则

      同步场景

      配置方式

      整库

      配置源端MySQL库表与目的端Kafka Topic的映射规则,可指定为固定的一个Topic,也可使用内置变量做映射,将不同源表数据同步到不同的Topic中。

      可以使用的内置变量有:

      • 源库名:#{source_db_name}
      • 源表名:#{source_table_name}

      分库分表

      无该配置项,默认使用源端配置的逻辑表名作为目的端同步的Topic名。

    • 同步kafka partition策略

      支持以下三种投递策略将源端的数据按规则同步到Kafka Topic的特定Partition:

      • 全部投递到Partition 0。
      • 按库名+表名的hash值投递到不同Partition。
      • 按表的主键值hash值投递到不同的Partition。

        源端无主键情况下,目的端默认投递到partition 0。

    • 需要同步的数据库操作

      支持同步的数据库操作包括DDL和DML,可单选或多选,不选择的情况下默认同步所有操作。

    • 投递到Kafka的数据格式

      选择投递到Kafka的数据组织格式,当前支持Debezium JSON和Canal JSON。

    • 高级配置

      支持在作业“任务配置”中添加自定义属性来开启部分高级功能,参数详情可参考MySQL->Kafka高级参数一览表。

      图8 添加自定义属性
      表7 MySQL > Kafka高级参数一览表

      参数名

      参数类型

      默认值

      单位

      参数说明

      ource.server.timezone

      string

      本地时区

      -

      连接源端数据库时指定的session时区,支持时区标准写法,例如utc+8等。

      source.convert.timestampWithServerTimeZone

      boolean

      true

      -

      timestamp类型数据输出时转为按源端时区。

      source.convert.bit1AsInt

      boolean

      true

      -

      是否将bit1输出成int类型。

      sink.delivery-guarantee

      string

      at-least-once

      -

      Flink写Kafka时的语义保证机制。

      • at-least-once:在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
      • exactly-once:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据,在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。

  8. 刷新源表和目标表映射,检查映射关系是否正确。

    表8 源表与目标表映射

    同步场景

    配置方式

    整库

    支持用户根据实际需求修改映射后的目的端Topic名称,可以配置为一对一、多对一的映射关系。

    图9 整库场景下源表与目标表映射

    分库分表

    默认使用源端配置的逻辑表名作为目的端的Topic名称。

    图10 分库分表场景下源表与目标表映射

  9. 配置任务属性。

    表9 任务配置参数说明

    参数

    说明

    默认值

    执行内存

    作业执行分配内存,跟随处理器核数变化而自动变化。

    8GB

    处理器核数

    范围:2-32。

    每增加1处理核数,则自动增加4G执行内存和1并发数。

    2

    并发数

    作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。

    1

    自动重试

    作业失败时是否开启自动重试。

    最大重试次数

    “自动重试”为是时显示该参数。

    1

    重试间隔时间

    “自动重试”为是时显示该参数。

    120秒

    是否写入脏数据

    选择是否记录脏数据,默认不记录脏数据,当脏数据过多时,会影响同步任务的整体同步速度。

    链路是否支持写入脏数据,以实际界面为准。

    • 否:默认为否,不记录脏数据。

      表示不允许脏数据存在。如果同步过程中产生脏数据,任务将失败退出。

    • 是:允许脏数据,即任务产生脏数据时不影响任务执行。
      允许脏数据并设置其阈值时:
      • 若产生的脏数据在阈值范围内,同步任务将忽略脏数据(即不会写入目标端),并正常执行。
      • 若产生的脏数据超出阈值范围,同步任务将失败退出。
        说明:

        脏数据认定标准:脏数据是对业务没有意义,格式非法或者同步过程中出现问题的数据;单条数据写入目标数据源过程中发生了异常,则此条数据为脏数据。 因此只要是写入失败的数据均被归类于脏数据。

        例如,源端是VARCHAR类型的数据写到INT类型的目标列中,则会因为转换不合理导致脏数据不会成功写入目的端。用户可以在同步任务配置时,配置同步过程中是否写入脏数据,配置脏数据条数(单个分片的最大错误记录数)保证任务运行,即当脏数据超过指定条数时,任务失败退出。

    脏数据策略

    “是否写入脏数据”为是时显示该参数,当前支持以下策略:

    • 不归档:不对脏数据进行存储,仅记录到任务日志中。
    • 归档到OBS:将脏数据存储到OBS中,并打印到任务日志中。

    不归档

    脏数据写入连接

    “脏数据策略”选择归档到OBS时显示该参数。

    脏数据要写入的连接,目前只支持写入到OBS连接。

    -

    脏数据目录

    脏数据写入的OBS目录。

    -

    脏数据阈值

    是否写入脏数据为是时显示该参数。

    用户根据实际设置脏数据阈值。

    说明:
    • 脏数据阈值仅针对每个并发生效。比如阈值为100,并发为3,则该作业可容忍的脏数据条数最多为300。
    • 输入-1表示不限制脏数据条数

    100

    添加自定义属性

    支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。

    -

  10. 提交并运行任务。

    作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。

    图11 提交作业

    提交成功后,单击作业开发页面左上角“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。

    图12 启动配置
    表10 启动配置参数

    参数

    说明

    同步模式

    • 增量同步:从指定时间位点开始同步增量数据。
    • 全量+增量:先同步全量数据,随后实时同步增量数据。

    时间

    增量同步需要设置该参数,指示增量同步起始的时间位点。

    说明:

    配置的位点时间早于Binlog日志最早时间点时,默认会以日志最新时间点开始消费。

  11. 监控作业。

    通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维

    图13 前往监控

性能调优

若链路同步速度过慢,可参考参见任务性能调优章节章节中对应链路文档进行排查及处理。

相关文档