更新时间:2025-09-09 GMT+08:00

GaussDB集中式/分布式同步到DMS Kafka作业配置

支持的源端和目的端数据库版本

表1 支持的数据库版本

源端数据库

目的端数据库

GaussDB集中式/分布式数据库(内核引擎版本505.1.0、505.1.0.SPC0100、505.2.0)

Kafka集群(2.7、3.x版本)

数据库账号权限要求

在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。

表2 数据库账号权限

类型名称

权限要求

源数据库连接账号

  1. 需要REPLICATION权限或继承了内置角色gs_role_replication的权限、DATABASE的CONNECT权限、SCHEMA的USAGE权限、表的SELECT权限、序列的SELECT权限,同时需要有查询pg_ls_waldir()函数的权限,具体操作请参见如何增加PostgreSQL、Gaussdb数据源额外权限?
  2. 需要具备远程连接权限。

    远程连接权限的添加方法:

    • 集中式:在源数据库的“gs_hba.conf”配置文件添加配置“host all <user> 0.0.0.0/0 sha256”和“host replication <user> 0.0.0.0/0 sha256”。
    • 分布式:在源数据库CN节点的“gs_hba.conf”配置文件添加配置“host all <user> 0.0.0.0/0 sha256”;在源数据库DN节点的“gs_hba.conf”配置文件添加配置“host all <user> 0.0.0.0/0 sha256”和“host replication <user> 0.0.0.0/0 sha256”。
    • 在源库使用系统管理员用户执行语句“select pg_reload_conf();”生效,或重启数据库实例生效。
  3. 需保证安全组和防火墙策略放开对应连接端口。

    安全组和防火墙开放策略:数据库连接已放开普通连接和复制连接端口。

    集中式:复制连接端口为普通连接端口+1。

    分布式:复制连接端口为CN/DN连接端口+1,在源数据库执行语句“select * from pgxc_node”可查看DN连接端口。

目标数据库连接账号

Kafka开启密文接入场景下,所配置用户需要有发布和订阅Topic的权限,其余场景无特殊权限要求。

  • 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
  • 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。

支持的同步对象范围

在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。

表3 同步对象范围

类型名称

使用须知

同步对象范围

  • 支持同步DML:包括INSERT、UPDATE、DELETE。
  • 不支持同步DDL。
  • 仅支持同步有主键表。
  • 仅支持同步分区键为主键的分区表。
  • 不支持暂停新增表。
  • 不支持同步视图、外键、存储过程、触发器、函数、事件、虚拟列、唯一约束和唯一索引。
  • 不支持同步无日志表(UNLOGGED TABLE)、临时表、系统模式和系统表。
  • 自动建表支持同步表结构、普通索引、约束(主键、空、非空)、注释。

注意事项

除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。

表4 注意事项

类型名称

使用和操作限制

数据库限制

  • 源数据库参数要求:
    • 源库的password_encryption_type参数设置为1,修改用户密码使其生效。
    • 源库的wal_level参数设置为logical。
    • 源库的max_replication_slots参数值必须大于当前已使用的复制槽数量,可基于需要创建的实时作业数量评估。
  • 源数据库的分区表触发器不可以设置为disable。
  • 仅支持使用默认证书的DMS Kafka开启SASL_SSL认证进行数据同步;其余类型Kafka不支持开启SASL_SSL认证进行同步。

使用限制

通用:

  • 实时同步过程中,不支持IP、端口、账号、密码修改。
  • 首次启动作业无法指定位点启动,默认会保留6小时日志,即作业首次启动x小时后,可停止,然后指定前x小时内的位点启动(x不大于日志保留时间),日志保留时间可在“任务配置 > 添加自定义属性”使用参数slot.flush.delay.time自行设置,支持的写法有(1d | 1day | 2days | 1h | 1hour | 2hours | 1m | 1minute | 2minutes)。
  • 支持在“任务配置 > 添加自定义属性”使用参数custom.slot.name指定已创建的非活跃复制槽进行增量同步。
  • 不支持在作业停止后自动删除逻辑复制槽。

全量同步阶段:

任务启动和全量数据同步阶段,请不要在源数据库执行DDL操作,否则可能导致任务异常。

增量同步阶段:

  • 请勿修改源数据库表的主键或者唯一键(主键不存在时),否则可能导致增量数据不一致或任务失败。
  • 请勿修改源数据库中表的replica identity属性,否则可能导致增量数据不一致或任务失败。
  • GaussDB数据源复制槽数达到上限时,无法执行新的作业,可以通过设置max_replication_slots的数值提高复制槽的使用上限或手动删除复制槽(Postgres数据源不支持自动删除复制槽)解决,手动删除请参见PostgreSQL数据源如何手动删除复制槽?

常见故障排查:

在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。

其他限制

  • 启动任务前,请确保源库中未启动长事务,源库启动长事务会阻塞逻辑复制槽的创建,进而引发任务失败。
  • 任务启动后,不支持源库发生主备倒换。

操作步骤

本小节以GaussDB到DMS Kafka的实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。

  1. 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
  2. 选择数据连接类型:源端选PostgreSQL,目的端选MRS Kafka。

    图1 选择数据连接类型

  3. 选择集成作业类型:同步类型默认为实时,同步场景包含整库场景。

    图2 选择集成作业类型

    同步场景相关介绍请参见同步场景

  4. 配置网络资源:选择已创建的GaussDB(RDS PostgreSQL)、MRS Kafka数据连接和已配置好网络连接的migration资源组。

    图3 选择数据连接及migration资源组

    无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。

    无可选migration资源组时,可单击“新建”跳转至购买migration资源组页面创建migration资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。

  5. 检测网络连通性:数据连接和migration资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和migration资源组之间的连通性测试。

    • 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
    • 单击源端和目的端数据源和migration资源组中的“测试”按钮进行检测。

      网络连通性检测异常可先参考数据源和资源组网络不通如何排查?章节进行排查。

  6. 配置源端参数。

    各同步场景下选择需要同步库表的方式请参考下表。

    表5 选择需要同步的库表

    同步场景

    配置方式

    整库

    选择需要迁移的GaussDB库表。
    图4 选择库表

    库与表均支持自定义选择,即可选择一库一表,也可选择多库多表。

  7. 配置目的端参数。

    图5 Kafka目的端配置项
    • 目标Topic名称规则。

      配置源端库表与目的端Kafka Topic的映射规则,可指定为固定的一个Topic,也可使用内置变量做映射,将不同源表数据同步到不同的Topic中。

      可以使用的内置变量有:

      • 源库名:#{source_db_name}。
      • 源表名:#{source_table_name}。
    • 同步kafka partition策略

      支持以下三种投递策略将源端的数据按规则同步到Kafka Topic的特定Partition:

      • 全部投递到Partition 0。
      • 按库名+表名的hash值投递到不同Partition。
      • 按表的主键值hash值投递到不同的Partition。

        源端无主键情况下,目的端默认投递到partition 0。

    • 新建Topic的Partition数量

      设定目的端Kafka无对应Topic时,Migration自动建Topic的分区数量,默认为3。

    • Kafka目标端属性配置

      支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,具体参数可参考Kafka官方文档中的配置说明。

    • 高级配置

      支持在作业“任务配置”中添加自定义属性来开启部分高级功能,参数详情可参考GaussDB > Kafka高级参数一览表。

      图6 添加自定义属性
      表6 GaussDB > Kafka高级参数一览表

      参数名

      参数类型

      默认值

      单位

      参数说明

      sink.delivery-guarantee

      string

      at-least-once

      -

      Flink写Kafka时的语义保证机制。

      • at-least-once:在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
      • exactly-once:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据,在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。

  8. 刷新源表和目标表映射,检查映射关系是否正确。

    支持用户根据实际需求修改映射后的目的端Topic名称,可以配置为一对一、多对一的映射关系。
    图7 源表与目标表映射

  9. 配置任务属性。

    表7 任务配置参数说明

    参数

    说明

    默认值

    执行内存

    作业执行分配内存,跟随处理器核数变化而自动变化。

    8GB

    处理器核数

    范围:2-32。

    每增加1处理核数,则自动增加4G执行内存和1并发数。

    2

    并发数

    作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。

    1

    自动重试

    作业失败时是否开启自动重试。

    最大重试次数

    “自动重试”为是时显示该参数。

    1

    重试间隔时间

    “自动重试”为是时显示该参数。

    120秒

    添加自定义属性

    支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。

    -

  10. 提交并运行任务。

    作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。

    图8 提交作业

    提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。

    图9 启动配置
    表8 启动配置参数

    参数

    说明

    同步模式

    • 增量同步:从指定时间位点开始同步增量数据。
    • 全量+增量:先同步全量数据,随后实时同步增量数据。

  1. 监控作业。

    通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维

    图10 前往监控

性能调优

若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。