计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
态势感知 SA
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

MySQL同步到Kafka作业配置

更新时间:2025-02-18 GMT+08:00

支持的源端和目的端数据库版本

表1 支持的数据库版本

源端数据库

目的端数据库

MySQL数据库(5.6、5.7、8.x版本)

Kafka集群(2.7、3.x版本)

数据库账号权限要求

在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。

表2 数据库账号权限

类型名称

权限要求

源数据库连接账号

需要具备如下最小权限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT,即执行SQL:

GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '用户名'@'%';

目标数据库连接账号

MRS用户需要拥有Kafka对应Topic的读写权限,即必须属于kafka/kafkaadmin/kafkasuperuser用户组。

说明:

kafka普通用户需要被Kafka管理员用户授予特定Topic的读写权限,才能访问对应Topic。

说明:
  • 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
  • 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。

支持的同步对象范围

在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。

表3 同步对象范围

类型名称

使用须知

同步对象范围

  • 支持同步DML和DDL。
  • 仅支持同步MyISAM和InnoDB表。
  • 不支持同步视图、外键、存储过程、触发器、函数、事件、虚拟列、唯一约束和唯一索引。
  • 不支持同步对象中存在包含CASCADE、SET NULL、SET DEFAULT之类引用操作的外键。这些关联操作会导致更新或删除父表中的行会影响子表对应的记录,并且子表的相关操作并不记录binlog。

注意事项

除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。

表4 注意事项

类型名称

使用和操作限制

数据库限制

源端数据库中的库名、表名、字段名不能包含:.<'>/\"以及非ASCII字符,建议尽量使用常规字符避免任务失败。

使用限制

通用:

  • 实时同步过程中,不支持IP、端口、账号、密码修改。
  • 不允许源数据库进行恢复操作。
  • 建议MySQL Binlog保留3天以上,不支持强制清理Binlog。
  • 实时同步过程中,不允许源数据库MySQL跨大版本升级,否则可能导致数据不一致或者同步任务失败(跨版本升级后数据、表结构、关键字等信息均可能会产生兼容性改变),建议在该场景下重建同步任务。

全量同步阶段:

任务启动和全量数据同步阶段,请不要在源数据库执行DDL操作,否则可能导致任务异常。

增量同步阶段:

增量同步过程中,分库分表场景下,在多个分表执行的DDL,会同步多条数据到Kafka的Topic中。

常见故障排查:

在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。

其他限制

重命名表仅支持rename后库表在同步范围中的DDL操作(例如:RENAME TABLE A TO B,B需要在同步范围内)。

操作步骤

本小节以RDS for MySQL到DMS Kafka实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。

  1. 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
  2. 选择数据连接类型:源端选MySQL,目的端选DMS Kafka。

    图1 选择数据连接类型

  3. 选择集成作业类型:同步类型默认为实时,同步场景包含整库和分库分表场景。

    图2 选择集成作业类型
    说明:

    同步场景相关介绍请参见同步场景

  4. 配置网络资源:选择已创建的MySQL、DMS Kafka数据连接和已配置好网络连接的资源组。

    图3 选择数据连接及资源组
    说明:

    无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。

    无可选资源组时,可单击“新建”跳转至购买资源组页面创建资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。

  5. 检测网络连通性:数据连接和资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和资源组之间的连通性测试。

    • 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
    • 单击源端和目的端数据源和资源组中的“测试”按钮进行检测。
      说明:

      网络连通性检测异常可先参考数据源和资源组网络不通如何排查?章节进行排查。

  6. 配置源端参数。

    各同步场景下选择需要同步库表的方式请参考下表。

    表5 选择需要同步的库表

    同步场景

    配置方式

    整库

    选择需要迁移的MySQL库表。
    图4 选择库表

    库与表均支持自定义选择,即可选择一库一表,也可选择多库多表。

    分库分表

    添加逻辑表。
    • 逻辑表名:即最终写入到DMS Kafka的Topic名。
    • 源库过滤条件:支持填入正则表达式,在所有MySQL实例中通过该正则表达式过滤出要抽取数据写入目标端Kafka Topic的所有分库。
    • 源表过滤条件:支持填入正则表达式,在过滤出的源端分库中再次过滤出要抽取数据写入目标端Kafka Topic的所有分表。
      图5 添加逻辑表

    已添加的逻辑表支持预览表结构及来源库表,单击“操作”列的预览即可。预览逻辑表时,源表数量越多,等待时间可能越长,请耐心等待。

    图6 逻辑表预览

  7. 配置目的端参数。

    图7 Kafka目的端配置项
    • 目标Topic名称规则。

      配置源端MySQL库表与目的端Kafka Topic的映射规则

      表6 目标Topic名称规则

      同步场景

      配置方式

      整库

      配置源端MySQL库表与目的端Kafka Topic的映射规则,可指定为固定的一个Topic,也可使用内置变量做映射,将不同源表数据同步到不同的Topic中。

      可以使用的内置变量有:

      • 源库名:#{source_db_name}。
      • 源表名:#{source_table_name}。

      分库分表

      无该配置项,默认使用源端配置的逻辑表名作为目的端同步的Topic名。

    • 同步kafka partition策略

      支持以下三种投递策略将源端的数据按规则同步到Kafka Topic的特定Partition:

      • 全部投递到Partition 0。
      • 按库名+表名的hash值投递到不同Partition。
      • 按表的主键值hash值投递到不同的Partition。
        说明:

        源端无主键情况下,目的端默认投递到partition 0。

    • 需要同步的数据库操作

      支持同步的数据库操作包括DDL和DML,可单选或多选,不选择的情况下默认同步所有操作。

    • 投递到Kafka的数据格式

      选择投递到Kafka的数据组织格式,当前支持Debezium JSON和Canal JSON。

    • 新建Topic的Partition数量

      设定目的端Kafka无对应Topic时,Migration自动建Topic的分区数量,默认为3。

    • Kafka目标端属性配置

      支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,具体参数可参考Apache Kafka官方文档中的配置说明。

    • 高级配置

      支持在作业“任务配置”中添加自定义属性来开启部分高级功能,参数详情可参考MySQL->Kafka高级参数一览表。

      图8 添加自定义属性
      表7 MySQL > Kafka高级参数一览表

      参数名

      参数类型

      默认值

      单位

      参数说明

      source.server.timezone

      string

      本地时区

      -

      连接源端数据库时指定的session时区,支持时区标准写法,例如utc+8等。

      source.convert.timestampWithServerTimeZone

      boolean

      true

      -

      timestamp类型数据输出时转为按源端时区。

      source.convert.bit1AsInt

      boolean

      true

      -

      是否将bit1输出成int类型。

      sink.delivery-guarantee

      string

      at-least-once

      -

      Flink写Kafka时的语义保证机制。

      • at-least-once:在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
      • exactly-once:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据,在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。

  8. 刷新源表和目标表映射,检查映射关系是否正确。

    表8 源表与目标表映射

    同步场景

    配置方式

    整库

    支持用户根据实际需求修改映射后的目的端Topic名称,可以配置为一对一、多对一的映射关系。

    图9 整库场景下源表与目标表映射

    分库分表

    默认使用源端配置的逻辑表名作为目的端的Topic名称。

    图10 分库分表场景下源表与目标表映射

  9. 配置任务属性。

    表9 任务配置参数说明

    参数

    说明

    默认值

    执行内存

    作业执行分配内存,跟随处理器核数变化而自动变化。

    8GB

    处理器核数

    范围:2-32。

    每增加1处理核数,则自动增加4G执行内存和1并发数。

    2

    并发数

    作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。

    1

    自动重试

    作业失败时是否开启自动重试。

    最大重试次数

    “自动重试”为是时显示该参数。

    1

    重试间隔时间

    “自动重试”为是时显示该参数。

    120秒

    是否写入脏数据

    选择是否记录脏数据,默认不记录脏数据,当脏数据过多时,会影响同步任务的整体同步速度。

    链路是否支持写入脏数据,以实际界面为准。

    • 否:默认为否,不记录脏数据。

      表示不允许脏数据存在。如果同步过程中产生脏数据,任务将失败退出。

    • 是:允许脏数据,即任务产生脏数据时不影响任务执行。
      允许脏数据并设置其阈值时:
      • 若产生的脏数据在阈值范围内,同步任务将忽略脏数据(即不会写入目标端),并正常执行。
      • 若产生的脏数据超出阈值范围,同步任务将失败退出。
        说明:

        脏数据认定标准:脏数据是对业务没有意义,格式非法或者同步过程中出现问题的数据;单条数据写入目标数据源过程中发生了异常,则此条数据为脏数据。 因此只要是写入失败的数据均被归类于脏数据。

        例如,源端是VARCHAR类型的数据写到INT类型的目标列中,则会因为转换不合理导致脏数据不会成功写入目的端。用户可以在同步任务配置时,配置同步过程中是否写入脏数据,配置脏数据条数(单个分片的最大错误记录数)保证任务运行,即当脏数据超过指定条数时,任务失败退出。

    脏数据策略

    “是否写入脏数据”为是时显示该参数,当前支持以下策略:

    • 不归档:不对脏数据进行存储,仅记录到任务日志中。
    • 归档到OBS:将脏数据存储到OBS中,并打印到任务日志中。

    不归档

    脏数据写入连接

    “脏数据策略”选择归档到OBS时显示该参数。

    脏数据要写入的连接,目前只支持写入到OBS连接。

    -

    脏数据目录

    脏数据写入的OBS目录。

    -

    脏数据阈值

    是否写入脏数据为是时显示该参数。

    用户根据实际设置脏数据阈值。

    说明:
    • 脏数据阈值仅针对每个并发生效。比如阈值为100,并发为3,则该作业可容忍的脏数据条数最多为300。
    • 输入-1表示不限制脏数据条数。

    100

    添加自定义属性

    支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。

    -

  10. 提交并运行任务。

    作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。

    图11 提交作业

    提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。

    图12 启动配置
    表10 启动配置参数

    参数

    说明

    同步模式

    • 增量同步:从指定时间位点开始同步增量数据。
    • 全量+增量:先同步全量数据,随后实时同步增量数据。

    时间

    增量同步需要设置该参数,指示增量同步起始的时间位点。

    说明:

    配置的位点时间早于Binlog日志最早时间点时,默认会以日志最新时间点开始消费。

  11. 监控作业。

    通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维

    图13 前往监控

性能调优

若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容