更新时间:2025-09-09 GMT+08:00

PostgreSQL同步到MRS Hudi作业配置

支持的源端和目的端数据库版本

表1 支持的数据库版本

源端数据库

目的端数据库

PostgreSQL数据库(PostgreSQL 9.4、9.5、9.6、10、11、12、13、14、15、16版本)

MRS集群(3.2.0-LTS.x、3.3.x-LTS)

Hudi版本(0.11.0)

数据库账号权限要求

在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。

表2 数据库账号权限

类型名称

权限要求

源数据库连接账号

  • 数据库的CONNECT权限,模式的USAGE权限,表的SELECT权限,序列的SELECT权限,REPLICATION连接权限,同时需要有查询pg_ls_waldir()函数的权限,具体操作请参见如何增加PostgreSQL、Gaussdb数据源额外权限?
    说明:

    REPLICATION连接权限的添加方法:

    • 在源数据库的“pg_hba.conf”配置文件的所有配置前增加一行配置“host replication <src_user_name> <drs_instance_ip>/32 <认证方式>”;认证方式可参考PostgreSQL官方文档pg_hba.conf文件配置,常见的认证方式有scram-sha-256等。
    • 在源库使用SUPERUSER用户执行语句“select pg_reload_conf();”生效,或重启数据库实例生效。

目标数据库连接账号

目标数据库的每张表必须具有如下权限:INSERT、SELECT、UPDATE、DELETE、CONNECT、CREATE。

  • 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
  • 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。

支持的同步对象范围

在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。

表3 同步对象范围

类型名称

使用须知

同步对象范围

  • 支持同步DML:包括INSERT、UPDATE、DELETE。
  • 不支持同步DDL。
  • 仅支持同步有主键表。
  • 仅支持同步分区键为主键的分区表。
  • 不支持暂停新增表。
  • 不支持同步视图、外键、存储过程、触发器、函数、事件、虚拟列、唯一约束和唯一索引。
  • 不支持同步无日志表(UNLOGGED TABLE)、临时表、系统模式和系统表。
  • 自动建表支持同步表结构、普通索引、约束(主键、空、非空)、注释。

注意事项

除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。

表4 注意事项

类型名称

使用和操作限制

数据库限制

  • 目的端数据库中的库名、表名、字段名仅支持数字、字母和下划线,且字段名必须以字母或下划线开头,建议尽量使用常规字符避免任务失败。
  • 源数据库的分区表触发器不可以设置为disable。
  • 如果做增量同步:源数据库的“pg_hba.conf” 文件中包含如下的配置:
    host replication all 0.0.0.0/0 md5

使用限制

通用:

  • 实时同步过程中,不支持IP、端口、账号、密码修改。
  • 首次启动作业无法指定位点启动,默认会保留6小时日志,即作业首次启动x小时后,可停止,然后指定前x小时内的位点启动(x不大于日志保留时间),日志保留时间可在“任务配置 > 添加自定义属性”使用参数slot.flush.delay.time自行设置,支持的写法有(1d | 1day | 2days | 1h | 1hour | 2hours | 1m | 1minute | 2minutes)。
  • 支持在“任务配置 > 添加自定义属性”使用参数custom.slot.name指定已创建的非活跃复制槽进行增量同步。
  • 只支持使用插件为pgoutput的逻辑复制槽,建槽命令:SELECT * FROM pg_create_logical_replication_slot('your_slot_name', 'pgoutput');
  • 源表需要发布在dbz_publiction中。使用 "select * from pg_publication_tables;" 查询所有表的PUBLICATION,使用 "ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...]" 将表加入发布中。

全量同步阶段:

任务启动和全量数据同步阶段,请不要在源数据库执行DDL操作,否则可能导致任务异常。

增量同步阶段:

  • 请勿修改源数据库表的主键或者唯一键(主键不存在时),否则可能导致增量数据不一致或任务失败。
  • 请勿修改源数据库中表的replica identity属性,否则可能导致增量数据不一致或任务失败。
  • Postgres数据源复制槽数达到上限时,无法执行新的作业,可以通过设置max_replication_slots的数值提高复制槽的使用上限或手动删除复制槽(Postgres数据源不支持自动删除复制槽)解决,手动删除请参见PostgreSQL数据源如何手动删除复制槽?

常见故障排查:

在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。

其他限制

  • 启动任务前,请确保源库中未启动长事务,源库启动长事务会阻塞逻辑复制槽的创建,进而引发任务失败。
  • 任务启动后,不支持源库发生主备倒换。
  • 支持目标数据库中的表比源数据库多列场景,但是需要避免以下场景可能导致的任务失败。

    目标数据库多的列要求非空且没有默认值,源数据库insert数据,同步到目标数据库后多的列为null,不符合目标数据库要求。

操作步骤

本小节以PostgreSQL到Hudi的实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。

  1. 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
  2. 选择数据连接类型:源端选PostgreSQL,目的端选Hudi。

    图1 选择数据连接类型

  3. 选择集成作业类型:同步类型默认为实时,同步场景包含整库场景。

    图2 选择集成作业类型

    同步场景相关介绍请参见同步场景

  4. 配置网络资源:选择已创建的PostgreSQL、Hudi数据连接和已配置好网络连接的migration资源组。

    图3 选择数据连接及migration资源组

    无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。

    无可选migration资源组时,可单击“新建”跳转至购买migration资源组页面创建migration资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。

  5. 检测网络连通性:数据连接和migration资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和migration资源组之间的连通性测试。

    • 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
    • 单击源端和目的端数据源和migration资源组中的“测试”按钮进行检测。

      网络连通性检测异常可先参考数据源和资源组网络不通如何排查?章节进行排查。

  6. 配置源端参数。

    各同步场景下选择需要同步库表的方式请参考下表。

    表5 选择需要同步的库表

    同步场景

    配置方式

    整库

    选择需要迁移的PostgreSQL库表。
    图4 选择库表

    库与表均支持自定义选择,即可选择一库一表,也可选择多库多表。

  7. 配置目的端参数。

    • 源库表和目标匹配策略。

      各同步场景下源端库表和目标端库表的匹配策略请参考下表。

      表6 源库表和目标匹配策略

      同步场景

      配置方式

      整库

      • 库匹配策略。
        • 与来源库同名:数据将同步至与来源PostgreSQL 库名相同的Hudi库中。
        • 自定义:数据将同步至自行指定的Hudi库中。
      • 表匹配策略。
        • 与来源表同名:数据将同步至与来源PostgreSQL 库名相同的Hudi表中。
        • 自定义:数据将同步至自行指定的Hudi表中。
          图5 整库场景下源库表和目标匹配策略
          说明:

          自定义匹配策略时,支持用内置变量#{source_db_name}和#{source_table_name}标志来源的库名和表名,其中表匹配策略必须包含#{source_table_name}。

    • Hudi参数配置。

      其余Hudi目的端参数说明请参考下表。

      图6 Hudi目的端配置项
      表7 Hudi目的端配置项

      配置项

      默认值

      单位

      配置说明

      数据存储路径

      -

      -

      Hudi自动建表时的warehouse路径,每张表会在warehouse路径下创建子目录。支持填写HDFS和OBS路径,路径格式参考:

      • OBS路径:obs://bucket/warehouse。
      • HDFS路径:/tmp/warehouse。

      Hudi表属性全局配置

      -

      -

      支持通过参数配置部分高级功能,参数详情可参考Hudi高级配置一览表。

      表8 Hudi高级配置一览表

      参数名

      参数类型

      默认值

      单位

      参数说明

      index.type

      string

      BLOOM

      -

      Hudi表索引类型。

      支持BLOOM和BUCKET索引,数据量较大场景下强烈建议使用BUCKET索引性能更好。

      hoodie.bucket.index.num.buckets

      int

      256

      Hudi表单分区下Bucket桶数。

      说明:

      使用Hudi BUCKET表时需要设置Bucket桶数,桶数设置关系到表的性能,需要格外引起注意。

      • 非分区表桶数 = MAX(CEIL(单表数据量大小(GB)/1GB), 4)。
      • 分区表桶数 = MAX(CEIL(单分区数据量大小(GB)/1GB), 1)。

      其中,要注意的是:

      • 需要使用的是表的总数据大小,而不是压缩以后的文件大小。
      • 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。

      changelog.enabled

      boolean

      false

      -

      Hudi changelog功能开关,开启后Migration作业可输出DELETE和UPDATE BEFORE数据。

      logical.delete.enabled

      boolean

      true

      -

      逻辑删除开关,changelog开启时必须关闭逻辑删除。

      hoodie.write.liststatus.optimized

      boolean

      true

      -

      写log文件时是否开启liststatus优化。涉及到大表和分区数据量多的作业,在启动时list会非常耗时,可能导致作业启动超时,建议关闭。

      hoodie.index.liststatus.optimized

      boolean

      false

      -

      定位数据时是否开启liststatus优化。涉及到大表和分区数据量多的作业,在启动时list会非常耗时,可能导致作业启动超时,建议关闭。

      compaction.async.enabled

      boolean

      true

      -

      异步compaction开关。compaction操作一定程度会影响实时任务的写入性能,如果用户使用外置的compaction操作对hudi进行compaction,可以考虑设置为false关闭实时处理集成作业的compaction操作。

      compaction.schedule.enabled

      boolean

      true

      -

      生成compaction计划的开关。compaction计划必须由本服务生成,计划的执行可以交给Spark。

      compaction.delta_commits

      int

      5

      生成compaction request的频率。compaction request生成频率降低可以使得compaction频率降低从而提升作业性能。如果hudi增量数据较小。可以考虑增大该值。

      说明:

      例如配置为40,即每40次commit生成一个compaction request,因为Migration每分钟生成1个commit,那么每个compaction request将间隔40分钟。

      clean.async.enabled

      boolean

      true

      -

      做历史版本数据文件清理的开关。

      clean.retain_commits

      int

      30

      要保留的commit数。这些commit关联的数据文件版本将被保留 num_of_commits * time_between_commits 这么长的时间,建议配置为2倍的compaction.delta_commits。

      说明:

      例如配置为80,因为Migration每分钟生成1个commit,那么超过80分钟后如果有旧版本数据文件 ,则会生成clean request,且在执行clean时保留最近80个commit。

      hoodie.archive.automatic

      boolean

      true

      -

      Hudi commit文件老化开关。

      archive.min_commits

      int

      40

      将旧版commit归档到日志文件中时要保留不归档的最小commit数。建议配置成clean.retain_commits + 1。

      说明:

      例如配置成81,那么在触发归档动作时,将会保留最近81次commit文件。

      archive.max_commits

      int

      50

      触发归档动作的commit数。建议配置成archive.min_commits + 20。

      说明:

      例如配置成101,那么将在生成101个commit文件后触发归档commit文件动作。

      • 为了达到Migration作业性能最优,建议使用Hudi Bucket索引的MOR表,并根据实际数据量配置Bucket桶数。
      • 为了保证Migration作业的稳定性,建议将Hudi Compaction单独拆成Spark作业交由MRS执行,在Migration任务里仅开启生成compaction计划,具体可以参考如何配置Hudi Compaction的Spark周期任务?

  8. 刷新源表和目标表映射,检查映射关系是否正确,同时可根据需求修改表属性、添加附加字段,并通过“自动建表”能力在目的端Hudi数据库中建出相应的表。

    图7 源表与目标表映射
    • 同步主键

      Hudi表必须设置“同步主键”,在源端为非主键表时,表映射会失败。

    • 表属性编辑

      单击操作列“表属性编辑”可配置Hudi表属性,包含表类型,分区类型及表自定义属性。

      图8 Hudi表属性配置
      • 表类型:Hudi的表类型,可选MERGE_ON_READ和COPY_ON_WRITE。
      • 分区类型:Hudi表分区类型,可选无分区、时间分区、自定义分区。
        • 其中时间分区需要用户指定一个源端字段名,选择一个时间转换格式。

          比如时间分区用户指定一个源端字段名src_col_1,选择一个时间转换格式,日(yyyyMMdd)、月(yyyyMM)、年(yyyy),自动建表时会在Hudi表默认创建一个cdc_partition_key的字段,系统会根据配置的时间转换格式将源端字段(src_col_1)的值格式化后写入cdc_partition_key中。

        • 自定义分区不支持timestamp类型的字段,使用timestamp类型的字段会导致作业失败。
      • 表自定义属性:支持通过参数配置单表的部分高级功能,参数详情可参考Hudi高级配置一览表。
    • 附加字段编辑:单击操作列“附加字段编辑”可为目的端的Hudi表中增加自定义字段,同时附加字段也会额外加入到Hudi表的建表中。用户可以在已有的源表字段基础上添加多个附加字段,并自定义字段名、选择字段类型、填写字段值。
      • 字段名称:目的端Hudi表新增字段的名称。
      • 字段类型:目的端Hudi表新增字段的类型。
      • 字段值:目的端Hudi表新增字段的取值来源。
        表9 附加字段取值方式

        类型

        示例

        常量

        支持数字、字母、中文、特殊字符。彩色表情字符可能导致作业提交失败,需谨慎使用。

        内置变量

        • 源端host ip地址:source.host。
        • 源端schema名称:source.schema。
        • 源端table名称:source.table。
        • 目的端schema名称:target.schema。
        • 目的端table名称:target.table。

        源表字段

        源表中的任一字段。

        配置附加字段的取值来源于源表字段时,请注意任务运行过程中不能修改对应源表字段的名称,否则可能导致作业异常。

        udf方法

        • substring(#col, pos[, len]): 截取源端col列的子串, 范围在[pos, pos+len)。
        • date_format(#col, time_format[, src_tz, dst_tz]): 将源端col列按time_format格式化, 可选转换时区。
        • now([tz]): 获取指定时区的当前时间。
        • if(cond_exp, str1, str2): 满足条件表达式cond_exp时返回str1, 否则返回str2。
        • concat(#col[, #str, ...]): 拼接多个参数, 可为源端列或字符串。
        • from_unixtime(#col[, time_format]): 将unix时间戳按time_format格式化。
        • unix_timestamp(#col[, precision, time_format]): 将时间转成unix时间戳,可显式定义时间格式及转换后精度,time_format时间格式要与源端保持一致。
    • 自动建表:单击“自动建表”可按照已配置映射规则在目的端数据库自动建表,成功后表建立方式会显示为使用已有表。
      图9 自动建表
      • Migration仅支持自动建表,不支持自动建库和模式,需用户自行在目的端手动建出库和模式后再使用本功能建表。
      • 自动建表时对应的字段类型映射关系请参见字段映射关系章节。
      • 自动建出的Hudi表会带有3个审计字段,分别是cdc_last_update_date、logical_is_deleted、_hoodie_event_time,并会以_hoodie_event_time作为Hudi表的预聚合键。

  9. 配置任务属性。

    表10 任务配置参数说明

    参数

    说明

    默认值

    执行内存

    作业执行分配内存,跟随处理器核数变化而自动变化。

    8GB

    处理器核数

    范围:2-32。

    每增加1处理核数,则自动增加4G执行内存和1并发数。

    2

    并发数

    作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。

    1

    自动重试

    作业失败时是否开启自动重试。

    最大重试次数

    “自动重试”为是时显示该参数。

    1

    重试间隔时间

    “自动重试”为是时显示该参数。

    120秒

    是否写入脏数据

    选择是否记录脏数据,默认不记录脏数据,当脏数据过多时,会影响同步任务的整体同步速度。

    • 否:默认为否,不记录脏数据。

      表示不允许脏数据存在。如果同步过程中产生脏数据,任务将失败退出。

    • 是:允许脏数据,即任务产生脏数据时不影响任务执行。
      允许脏数据并设置其阈值时:
      • 若产生的脏数据在阈值范围内,同步任务将忽略脏数据(即不会写入目标端),并正常执行。
      • 若产生的脏数据超出阈值范围,同步任务将失败退出。
        说明:

        脏数据认定标准:脏数据是对业务没有意义,格式非法或者同步过程中出现问题的数据;单条数据写入目标数据源过程中发生了异常,则此条数据为脏数据。 因此只要是写入失败的数据均被归类于脏数据。

        例如,源端是VARCHAR类型的数据写到INT类型的目标列中,则会因为转换不合理导致脏数据不会成功写入目的端。用户可以在同步任务配置时,配置同步过程中是否写入脏数据,配置脏数据条数(单个分片的最大错误记录数)保证任务运行,即当脏数据超过指定条数时,任务失败退出。

    脏数据策略

    “是否写入脏数据”为是时显示该参数,当前支持以下策略:

    • 不归档:不对脏数据进行存储,仅记录到任务日志中。
    • 归档到OBS:将脏数据存储到OBS中,并打印到任务日志中。

    不归档

    脏数据写入连接

    “脏数据策略”选择归档到OBS时显示该参数。

    脏数据要写入的连接,目前只支持写入到OBS连接。

    -

    脏数据目录

    脏数据写入的OBS目录。

    -

    脏数据阈值

    是否写入脏数据为是时显示该参数。

    用户根据实际设置脏数据阈值。

    说明:
    • 脏数据阈值仅针对每个并发生效。比如阈值为100,并发为3,则该作业可容忍的脏数据条数最多为300。
    • 输入-1表示不限制脏数据条数。

    100

    添加自定义属性

    支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。

    -

  10. 提交并运行任务。

    作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。

    图10 提交作业

    提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。

    图11 启动配置
    表11 启动配置参数

    参数

    说明

    同步模式

    • 增量同步:从指定时间位点开始同步增量数据。
    • 全量+增量:先同步全量数据,随后实时同步增量数据。

  11. 监控作业。

    通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维

    图12 前往监控

性能调优

若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。