DMS Kafka同步到OBS作业配置
支持的源端和目的端数据库版本
源端数据库 |
目的端数据库 |
---|---|
Kafka集群(2.7、3.x版本) |
- |
数据库账号权限要求
在使用Migration进行同步时,源端和目的端所使用的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考下表进行赋权。
类型名称 |
权限要求 |
---|---|
源数据库连接账号 |
Kafka开启密文接入场景下,所配置用户需要有发布和订阅Topic的权限,其余场景无特殊权限要求。 |
目标数据库连接账号 |
需要有目标OBS桶访问权限,且拥有在桶下读写对象的权限,详情可参考OBS权限控制。 |

- 建议创建单独用于Migration任务连接的数据库账号,避免因为数据库账号密码修改,导致的任务连接失败。
- 连接源和目标数据库的账号密码修改后,请同步修改管理中心对应的连接信息,避免任务连接失败后自动重试,导致数据库账号被锁定影响使用。
支持的同步对象范围
在使用Migration进行同步时,不同类型的链路,支持的同步对象范围不同,详细情况可参考下表。
类型名称 |
使用须知 |
---|---|
同步对象范围 |
支持同步所有Kafka消息,其中支持对JSON或CSV格式的消息体进行解析。 |
注意事项
除了数据源版本、连接账号权限及同步对象范围外,您还需要注意的事项请参见下表。
类型名称 |
使用和操作限制 |
---|---|
数据库限制 |
|
使用限制 |
通用: 实时同步过程中,不支持IP、端口、账号、密码修改。 增量同步阶段: 整库场景下需要根据同步的Topic分区数对应增加作业并发数,否则可能导致任务内存溢出。 常见故障排查: 在任务创建、启动、全量同步、增量同步、结束等过程中,如有遇到问题,可先参考常见问题章节进行排查。 |
其他限制 |
|
操作步骤
本小节以DMS Kafka到OBS的实时同步为示例,介绍如何配置Migration实时集成作业。配置作业前请务必阅读使用前自检概览, 确认已做好所有准备工作。
- 参见新建实时集成作业创建一个实时集成作业并进入作业配置界面。
- 选择数据连接类型:源端选DMS Kafka,目的端选OBS。
图1 选择数据连接类型
- 选择集成作业类型:同步类型默认为实时,同步场景包含单表、整库场景。
图2 选择集成作业类型
- 配置网络资源:选择已创建的DMS Kafka、OBS数据连接和已配置好网络连接的migration资源组。
图3 选择数据连接及migration资源组
无可选数据连接时,可单击“新建”跳转至管理中心数据连接界面,单击“创建数据连接”创建数据连接,详情请参见配置DataArts Studio数据连接参数进行配置。
无可选migration资源组时,可单击“新建”跳转至购买migration资源组页面创建migration资源组配置,详情请参见购买创建数据集成资源组增量包进行配置。
- 检测网络连通性:数据连接和migration资源组配置完成后需要测试整个迁移任务的网络连通性,可通过以下方式进行数据源和migration资源组之间的连通性测试。
- 单击展开“源端配置”触发连通性测试,会对整个迁移任务的连通性做校验。
- 单击源端和目的端数据源和migration资源组中的“测试”按钮进行检测。
- 配置源端参数。
- 选择需要同步的Kafka Topic,各同步场景下选择需要同步主题的方式请参考下表。
表5 选择需要同步的主题 同步场景
配置方式
单表
输入一个需要迁移的Kafka Topic。图4 输入Kafka Topic整库
选择需要迁移的Kafka Topic。图5 选择Kafka Topic - 数据格式
- 消费组ID
消费者是从Topic订阅消息的一方,消费组是由一个或多个消费者组成的。Migration支持指定本次消费动作所属的Kafka消费组。
- Kafka源端属性配置
支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,具体参数可参考Kafka官方文档中的配置说明。
- 选择需要同步的Kafka Topic,各同步场景下选择需要同步主题的方式请参考下表。
- 配置目的端参数。
图6 目的端OBS配置
- 刷新源表和目标表映射,单击“目标字段编辑”检查要写入目的端的字段情况,并根据实际情况选择配置分区字段。
图7 源表与目标表映射
- 分区字段
支持配置分区字段,将在写入OBS时自动生成对应分区目录,目录名为“分区字段=分区值”。同时,字段选择顺序影响分区的层级,例如选择par1、par2作为分区字段,那么par1为一级分区,par2为二级分区,最多支持五级分区。
- 目标字段编辑
Migration会根据选择的源端消息格式自动解析源端消息,生成对应的字段信息,用户可在此基础上进行编辑,自定义字段名、选择字段类型、填写字段值。
- 字段名称:目的端OBS文件中写入字段的名称。字段名称至少包含一个字母,允许下划线、中划线,不支持纯数字。
- 字段类型:目的端OBS文件中写入字段的类型。当前支持STRING、BOOLEAN、INTEGER、LONG、FLOAT、DOUBLE、SHORT、DECIMAL、DATE、TIMESTAMP类型。
- 字段值:目的端OBS文件中写入字段的取值来源。
表7 目标字段取值方式 类型
字段取值
手动赋值
任意字符。
内置变量
Kafka的元数据,包括__key__、__value__、__Topic__、__partition__、__offset__、__timestamp__共6个字段。
源表字段
从源端Kafka Topic消息解析出的任意字段。
说明:如果源端Kafka消息为嵌套JSON的形式,本链路支持解析不同层级的字段值(包含数组,数组下标索引从1开始)。
例如,JSON的内容为:
{ "col1": "1", "col2": "2", "level1": { "level2": [ { "level3": "test" } ] } }
则可以通过level1.level2[1].level3取到数据”test”作为目标端某一个字段的值。
udf方法
支持填写Flink的内置函数用于数据转换,例如:
- CONCAT(CAST(NOW() as STRING), `col_name`)
- DATE_FORMAT(NOW(), 'yy')
注意,其中的字段名要用反引号包围起来。Flink完整内置函数可参考Flink官方文档。
- 分区字段
- 配置任务属性。
表8 任务配置参数说明 参数
说明
默认值
执行内存
作业执行分配内存,跟随处理器核数变化而自动变化。
8GB
处理器核数
范围:2-32。
每增加1处理核数,则自动增加4G执行内存和1并发数。
2
并发数
作业执行支持并发数。该参数无需配置,跟随处理器核数变化而自动变化。
1
自动重试
作业失败时是否开启自动重试。
否
最大重试次数
“自动重试”为是时显示该参数。
1
重试间隔时间
“自动重试”为是时显示该参数。
120秒
添加自定义属性
支持通过自定义属性修改部分作业参数及开启部分高级功能,详情可参见任务性能调优章节。
-
- 提交并运行任务。
作业配置完毕后,单击作业开发页面左上角“提交”,完成作业提交。
图8 提交作业提交成功后,单击作业开发页面“启动”按钮,在弹出的启动配置对话框按照实际情况配置同步位点参数,单击“确定”启动作业。
图9 启动配置表9 启动配置参数 参数
说明
偏移量参数
- 最早:从Kafka Topic最早偏移量开始消费数据。
- 最新:从Kafka Topic最新偏移量开始消费数据。
- 起止时间:根据时间获取Kafka Topic对应的偏移量,并从该偏移量开始消费数据。
时间
起止时间需要设置该参数,指示同步起始的时间位点。
说明:配置的位点时间早于Kafka消息最早偏移量时,默认会从最早偏移量开始消费。
- 监控作业。
通过单击作业开发页面导航栏的“前往监控”按钮,可前往作业监控页面查看运行情况、监控日志等信息,并配置对应的告警规则,详情请参见实时集成任务运维。
图10 前往监控
性能调优
若链路同步速度过慢,可参考参见任务性能调优章节中对应链路文档进行排查及处理。