更新时间:2024-10-22 GMT+08:00

源端为DMS Kafka,目的端为OBS

目前支持整库场景。

整库场景

  1. 源端配置。
    • Kafka配置。
      • 数据格式:源端Kafka Topic中消息内容的格式。

        目前支持JSON、CSV、TEXT格式。

        • JSON格式:支持对消息内容以JSON的层级格式进行解析。
        • CSV格式:支持对消息内容以CSV格式指定分隔符进行解析。
        • TEXT格式:将整条消息内容作为文本直接同步。
      • 消费组ID:由用户指定,标识当前实时处理集成作业的消费组。

        当迁移作业消费DMS Kafka集群某一Topic的消息后,在Kafka集群的“消费组管理”可以看到此处配置的消费组ID、在“消息查询”可以查到消费属性group.id。因为Kafka把消费消息的一方称为消费者(Consumer),多个消费者组成一个消费组(Consumer Group),消费组是Kafka提供的可扩展且具有容错性的消费者机制,建议配置消费组。

      • 字段分隔符:数据格式为CSV格式时支持该参数。

        使用该字符进行CSV消息的列分割,仅支持单个字符,默认为逗号。

      • 记录分隔符:数据格式为CSV格式时支持该参数。

        使用该字符串进行CSV消息的行分割,默认为换行符:\n。

      • Kafka源端属性配置:支持设置Kafka的配置项,需要增加 properties. 前缀,作业将自动移除前缀并传入底层Kafka客户端,例如:properties.connections.max.idle.ms=600000。
    • 添加数据源。
      图1 添加数据源
  2. 目标端配置。
    图2 配置目标端参数
    • 目标表的基本配置:
      • 文件存储格式:Parquet、TextFile、SequenceFile。
      • OBS存储路径:指定OBS文件存储的路径。

        支持填写#{source_topic_name}内置变量,可将源端不同的topic的数据写入不同的路径下。例如:obs://bucket/dir/test.db/prefix_#{source_topic_name}_suffix/。

      • 文件压缩方式:指定写入文件的压缩方式,默认不进行压缩。

        Parquet格式:UNCOMPRESSED,SNAPPY。

        SequenceFile格式:UNCOMPRESSED,SNAPPY,GZIP,LZ4。

        BZIP2TextFile格式:UNCOMPRESSED。

      • 全局高级配置:单击查看编辑,配置高级属性。

        auto-compaction:数据会先被写入临时文件,当checkpoint完成后,该配置控制检查点产生的临时文件是否被合并。

    • 源表与目标表映射:
      图3 配置源表与目标表映射
      • 单表高级配置:单击“单表高级配置”为列表中的topic配置对应属性。

        auto-compaction:数据会先被写入临时文件,当checkpoint完成后,该配置控制检查点产生的临时文件是否被合并。

      • 目标字段编辑:单击操作列“目标字段编辑”可为迁移后的目标表中自定义字段。
        表1 目标字段值支持情况

        类型

        示例

        源表字段

        -

        内置变量

        __key__

        __value__

        _topic__

        __partition__

        __offset__

        _timestamp__

        手动赋值

        -

        udf方法

        支持填写Flink的内置函数用于数据转换。

        例如:CONCAT(CAST(NOW() as STRING), `col_name`)、DATE_FORMAT(NOW(), 'yy')。

        注意,其中字段名要用反引号包围起来。

        可以自定义字段名(如custom_defined_col)、选择字段类型、填写字段值。

        用户可同时添加多个字段。