更新时间:2025-09-04 GMT+08:00
分享

DMS Kafka同步到Doris作业配置

支持的源端和目的端数据库版本

表1 支持的数据库版本

源端数据库

目的端数据库

Kafka集群(2.7、3.x版本)

Doris(Doris1.2、Doris2.0)

数据库账号权限要求

在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。

表2 数据库账号权限

类型名称

权限要求

源数据库连接账号

Kafka开启密文接入场景下,所配置用户需要有发布和订阅Topic的权限,其余场景无特殊权限要求。

目标数据库连接账号

目标数据库的每张表必须具有如下权限:LOAD 、SELECT、CREATE、ALTER、DROP。

  • 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
  • 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。

支持的同步对象范围

在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。

表3 同步对象范围

类型名称

使用须知

同步对象范围

支持同步所有Kafka消息,其中支持对CDC JSON格式的消息体进行解析,且仅同步上游指定库表对应的消息。

注意事项

除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。

表4 注意事项

类型名称

使用和操作限制

数据库限制

  • 支持开启SASL_PLAINTEXT的Kafka实例,包括SCRAM-SHA-512及PLAIN认证机制。
  • 仅支持使用默认证书的DMS Kafka开启SASL_SSL认证进行数据同步;其余类型Kafka不支持开启SASL_SSL认证进行同步。
  • 支持开启Kerberos认证的MRS Kafka。
  • 目的端数据库中的对象名需要满足约束:
    • 表名长度不超过64个字符,以字母开头,中间字符可以是字母、数字、下划线、中划线。
    • 字段名长度不超过255个字符,建议使用常规字符,不能包含中文等特殊字符。

使用限制

通用:

  • 实时同步过程中,不支持IP、端口、账号、密码修改。
  • 自动建表不支持创建聚合模型doris表,需手动创建。
  • MRS Doris支持HTTP、HTTPS,Cloud table Doris当前仅支持HTTP。
  • 当前仅支持StreamLoad方式导入。

常见故障排查:

在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。

其他限制

  • Doris不能用字符串类型作为主键,包括联合主键其中一个字段为字符串类型也不行。
  • 不支持在多并发场景下同步DDL。

操作步骤

本小节以Kafka到Doris的实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。

  1. 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
  2. 选择数据连接类型:源端选Kafka,目的端选Doris。

    图1 选择数据连接类型

  3. 选择集成作业类型:同步类型默认为实时,同步场景包含单表场景。

    图2 选择集成作业类型

    同步场景相关介绍请参见同步场景

  4. 配置网络资源:选择已创建的Kafka、Doris数据连接和已配置好网络连接的migration资源组。

    图3 选择数据连接及migration资源组

    无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。

    无可选migration资源组时,可单击“新建”跳转至购买migration资源组页面创建migration资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。

  5. 检测网络连通性:数据连接和migration资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和migration资源组之间的连通性测试。

    • 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
    • 单击源端和目的端数据源和migration资源组中的“测试”按钮进行检测。

      网络连通性检测异常可先参考数据源和资源组网络不通如何排查?章节进行排查。

  6. 配置源端参数。

    选择需要同步的Kafka Topic。
    图4 输入Kafka Topic
    • 主题

      输入一个需要迁移的Kafka Topic。

    • 数据格式

      源端Kafka Topic中消息内容的格式,Migration当前支持对如下消息进行处理:

      DEBEZIUM_JSON:支持解析Debezium格式的CDC JSON,同步数据到目的端对应的库表中。

    • 消费组ID

      消费者是从Topic订阅消息的一方,消费组是由一个或多个消费者组成的。Migration支持指定本次消费动作所属的Kafka消费组。

    • Kafka源端属性配置

      支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,具体参数可参考Kafka官方文档中的配置说明。

    • 上游数据源类型

      仅当数据格式选择DEBEZIUM_JSON时需要配置,当前仅支持选择MySQL,标志CDC JSON为MySQL产生的。

    • 库表名称列表

      仅当数据格式选择DEBEZIUM_JSON时需要配置,标志Kafka消息来源的数据库表,任务会解析CDC JSON并且只同步已配置库表的数据。

  7. 配置目的端参数。

    源端Kafka为CDC_JSON数据格式时,可能是来源于上游数据库中的多个库表,Migration支持进行上游库表和目的端库表的匹配。
    • 源库表和目标匹配策略。
      • 库匹配策略:
        • 与来源库同名:数据将同步至与来源库名相同的Doris 库中。
        • 自定义:数据将同步至自行指定的Doris 库中。
      • 表匹配策略:
        • 与来源表同名:数据将同步至与来源表名相同的Doris表中。
        • 自定义:数据将同步至自行指定的Doris表中。
      图5 分库分表场景下源库表和目标匹配策略

      自定义匹配策略时,支持用内置变量#{source_db_name}和#{source_table_name}标志来源的库名和表名,其中表匹配策略必须包含#{source_table_name}。

    • Doris参数配置。

      高级配置:支持通过参数配置部分高级功能,参数详情可参考下表。

      表5 Doris高级配置一览表

      参数名

      参数类型

      默认值

      单位

      配置说明

      doris.request.connect.timeout.ms

      int

      30000

      ms

      Doris连接超时时间。

      doris.request.read.timeout.ms

      int

      30000

      ms

      Doris读取超时时间。

      doris.request.retries

      int

      3

      -

      Doris请求失败重试次数。

      sink.max-retries

      int

      3

      -

      数据库写入失败时的最大重试次数。

      sink.batch.interval

      string

      1s

      h/min/s

      异步线程写入数据的时间间隔。

      sink.enable-delete

      boolean

      true

      -

      是否启用删除功能,如果关闭,源端删除数据,目的端不会删除。

      sink.batch.size

      int

      20000

      -

      单次写(插入、更新、删除)数据的最大行数。

      sink.batch.bytes

      long

      10485760

      bytes

      单次写(插入、更新、删除)数据的最大字节数。

      logical.delete.enabled

      boolean

      false

      -

      逻辑删除开关,开启本开关后,需要目的端包含删除标记列,在源端删除数据,不会将目的端对应的数据删除,而是将删除标记列设为true,表示该数据在源端已不存在。

      logical.delete.column

      string

      logical_is_deleted

      -

      逻辑删除标记列名称,默认为logical_is_deleted,支持用户自定义。

      sink.keyby.mode

      string

      pk

      -

      doris多并发写时的分区方式,默认为pk(主键),源端为kafka类型没有主键时,应选择table,以库表名进行分区。

      doris.sink.flush.tasks

      int

      1

      -

      单个taskmanager的flush并发数。

      sink.properties.format

      string

      json

      -

      Stream Load 使用的数据格式,可选择json/csv。

      sink.properties.Content-Encoding

      string

      -

      -

      HTTP头部消息体压缩格式,目前只支持 CSV 文件的压缩,支持gzip。

      sink.properties.compress_type

      string

      -

      -

      文件的压缩格式,目前只支持 CSV 文件的压缩。支持 gz, lzo, bz2, lz4, lzop, deflate 压缩格式。

  8. 刷新源表和目标表映射。

    图6 源表与目标表映射
    • 同步主键

      查询到的目的端Doris表的主键。

    • 表建立方式

      本链路不支持自动建表,用户需手动创建Doris表。

  9. 配置DDL消息处理规则。

    实时集成作业除了能够同步对数据的增删改等DML操作外,也支持对部分表结构变化(DDL)进行同步。针对支持的DDL操作,用户可根据实际需求配置为正常处理/忽略/出错。

    • 正常处理:Migration识别到源端库表出现该DDL动作时,作业自动同步到目的端执行该DDL操作。
    • 忽略:Migration识别到源端库表出现该DDL动作时,作业忽略该DDL,不同步到目的端表中。
    • 出错:Migration识别到源端库表出现该DDL动作时,作业抛出异常。
      图7 DDL配置

  10. 配置任务属性。

    表6 任务配置参数说明

    参数

    说明

    默认值

    执行内存

    作业执行分配内存,跟随处理器核数变化而自动变化。

    8GB

    处理器核数

    范围:2-32。

    每增加1处理核数,则自动增加4G执行内存和1并发数。

    2

    并发数

    作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。

    1

    自动重试

    作业失败时是否开启自动重试。

    最大重试次数

    “自动重试”为是时显示该参数。

    1

    重试间隔时间

    “自动重试”为是时显示该参数。

    120秒

    是否写入脏数据

    选择是否记录脏数据,默认不记录脏数据,当脏数据过多时,会影响同步任务的整体同步速度。

    • 否:默认为否,不记录脏数据。

      表示不允许脏数据存在。如果同步过程中产生脏数据,任务将失败退出。

    • 是:允许脏数据,即任务产生脏数据时不影响任务执行。
      允许脏数据并设置其阈值时:
      • 若产生的脏数据在阈值范围内,同步任务将忽略脏数据(即不会写入目标端),并正常执行。
      • 若产生的脏数据超出阈值范围,同步任务将失败退出。
        说明:

        脏数据认定标准:脏数据是对业务没有意义,格式非法或者同步过程中出现问题的数据;单条数据写入目标数据源过程中发生了异常,则此条数据为脏数据。 因此只要是写入失败的数据均被归类于脏数据。

        例如,源端是VARCHAR类型的数据写到INT类型的目标列中,则会因为转换不合理导致脏数据不会成功写入目的端。用户可以在同步任务配置时,配置同步过程中是否写入脏数据,配置脏数据条数(单个分片的最大错误记录数)保证任务运行,即当脏数据超过指定条数时,任务失败退出。

    脏数据策略

    “是否写入脏数据”为是时显示该参数,当前支持以下策略:

    • 不归档:不对脏数据进行存储,仅记录到任务日志中。
    • 归档到OBS:将脏数据存储到OBS中,并打印到任务日志中。

    不归档

    脏数据写入连接

    “脏数据策略”选择归档到OBS时显示该参数。

    脏数据要写入的连接,目前只支持写入到OBS连接。

    -

    脏数据目录

    脏数据写入的OBS目录。

    -

    脏数据阈值

    是否写入脏数据为是时显示该参数。

    用户根据实际设置脏数据阈值。

    说明:
    • 脏数据阈值仅针对每个并发生效。比如阈值为100,并发为3,则该作业可容忍的脏数据条数最多为300。
    • 输入-1表示不限制脏数据条数。

    100

    添加自定义属性

    支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。

    -

  11. 提交并运行任务。

    作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。

    图8 提交作业

    提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。

    图9 启动配置
    表7 启动配置参数

    参数

    说明

    偏移量参数

    • 最早:从Kafka Topic最早偏移量开始消费数据。
    • 最新:从Kafka Topic最新偏移量开始消费数据。
    • 起止时间:根据时间获取Kafka Topic对应的偏移量,并从该偏移量开始消费数据。

    时间

    起止时间需要设置该参数,指示同步起始的时间位点。

    说明:

    配置的位点时间早于Kafka消息最早偏移量时,默认会从最早偏移量开始消费。

  12. 监控作业。

    通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维

    图10 前往监控

性能调优

若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。

相关文档