源端为DMS Kafka,目的端为OBS
目前支持整库场景。
整库场景
- 源端配置。
- Kafka配置。
- 数据格式:源端Kafka Topic中消息内容的格式。
- JSON格式:支持对消息内容以JSON的层级格式进行解析。
- CSV格式:支持对消息内容以CSV格式指定分隔符进行解析。
- TEXT格式:将整条消息内容作为文本直接同步。
- 消费组ID:由用户指定,标识当前实时处理集成作业的消费组。
当迁移作业消费DMS Kafka集群某一Topic的消息后,在Kafka集群的“消费组管理”可以看到此处配置的消费组ID、在“消息查询”可以查到消费属性group.id。因为Kafka把消费消息的一方称为消费者(Consumer),多个消费者组成一个消费组(Consumer Group),消费组是Kafka提供的可扩展且具有容错性的消费者机制,建议配置消费组。
- 字段分隔符:数据格式为CSV格式时支持该参数。
- 记录分隔符:数据格式为CSV格式时支持该参数。
- Kafka源端属性配置:支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,例如:properties.connections.max.idle.ms=600000。
- 数据格式:源端Kafka Topic中消息内容的格式。
- 添加数据源。
图1 添加数据源
- Kafka配置。
- 目标端配置。
图2 配置目标端参数
- 目标表的基本配置:
- 文件存储格式:Parquet、TextFile、SequenceFile。
- OBS存储路径:指定OBS文件存储的路径。
支持填写#{source_topic_name}内置变量,可将源端不同的topic的数据写入不同的路径下。例如:obs://bucket/dir/test.db/prefix_#{source_topic_name}_suffix/。
- 文件压缩方式:指定写入文件的压缩方式,默认不进行压缩。
Parquet格式:UNCOMPRESSED,SNAPPY。
SequenceFile格式:UNCOMPRESSED,SNAPPY,GZIP,LZ4。
BZIP2TextFile格式:UNCOMPRESSED。
- 全局高级配置:单击查看编辑,配置高级属性。
auto-compaction:数据会先被写入临时文件,当checkpoint完成后,该配置控制检查点产生的临时文件是否被合并。
- 源表与目标表映射:
图3 配置源表与目标表映射
- 单表高级配置:单击“单表高级配置”为列表中的topic配置对应属性。
auto-compaction:数据会先被写入临时文件,当checkpoint完成后,该配置控制检查点产生的临时文件是否被合并。
- 目标字段编辑:单击操作列“目标字段编辑”可为迁移后的目标表中自定义字段。
表1 目标字段值支持情况 类型
示例
源表字段
-
内置变量
__key__
__value__
_topic__
__partition__
__offset__
_timestamp__
手动赋值
-
udf方法
支持填写Flink的内置函数用于数据转换。
例如:CONCAT(CAST(NOW() as STRING), `col_name`)、DATE_FORMAT(NOW(), 'yy')。
注意,其中字段名要用反引号包围起来。
可以自定义字段名(如custom_defined_col)、选择字段类型、填写字段值。
用户可同时添加多个字段。
- 单表高级配置:单击“单表高级配置”为列表中的topic配置对应属性。
- 目标表的基本配置: