更新时间:2024-11-29 GMT+08:00

从ThirdKafka同步开源debezium-json数据到Hudi

操作场景

本章节指导用户通过开启Kerberos认证的集群的CDLService WebUI界面从ThirdKafka同步开源debezium-json格式数据到Hudi,CDL目前支持1.4.0版本MySQL、PostgreSQL和Oracle数据库Debezium连接器捕获的JSON格式的事件消息。

前提条件

  • 集群已安装CDL、Hudi服务且运行正常。
  • ThirdKafka的Topic需要能被MRS集群消费,操作步骤请参考ThirdPartyKafka前置准备
  • 已配置好数据库对应的1.4.0版本的Debezium连接器,能正常捕获数据库变更事件,并以JSON格式写入ThirdKafka。
  • key相同或者key为null的Debezium JSON格式Kafka消息写入ThirdKafka Topic的同一Partition中。
  • 在FusionInsight Manager中创建一个人机用户,例如“cdluser”,加入用户组cdladmin、hadoop、kafkasupergroup,主组选择“cdladmin”组,关联角色“System_administrator”

操作步骤

  1. 使用cdluser用户登录FusionInsight Manager(首次登录需要修改密码),选择“集群 > 服务 > CDL”,单击“CDLService UI”右侧的超链接,进入CDLService WebUI界面。
  2. 选择“连接管理 > 新增连接”,进入“新增连接”参数配置窗口,参考下表,分别新增“thirdparty-kafka”、“hudi”连接,相关数据连接参数介绍请参见创建数据库连接

    表1 thirdparty-kafka数据连接配置参数

    参数名称

    示例

    Name

    debezium_link

    Link Type

    thirdparty-kafka

    Bootstrap Servers

    10.10.10.10:9093

    Security Protocol

    SASL_SSL

    Username

    testuser

    Password

    testuser用户密码

    SSL Truststore Location

    单击“上传文件”,上传认证文件

    SSL Truststore Password

    -

    Datastore Type

    debezium-json

    Description

    thirdparty-kafka Link

    thirdparty-kafka也可以使用MRS Kafka作为源端,若使用用户名(Username)密码(Password)进行登录认证,则需先登录Manager界面,选择“集群 > 服务 > Kafka > 配置”,在搜索框中搜索“sasl.enabled.mechanisms”,为该参数值增加“PLAIN”,单击“保存”保存配置,并重启Kafka服务使配置生效:

    再在CDL WebUI界面配置使用MRS Kafka作为源端的thirdparty-kafka连接,例如相关数据连接配置如下:

    表2 Hudi数据连接配置参数

    参数名称

    示例

    Link Type

    hudi

    Name

    hudilink

    Storage Type

    hdfs

    Auth KeytabFile

    /opt/Bigdata/third_lib/CDL/user_libs/cdluser.keytab

    Principal

    cdluser

    Description

    -

  1. 参数配置完成后,单击“测试连接”,检查数据连通是否正常。

    连接校验通过后,单击“确定”完成数据连接创建。

  2. (可选)选择“ENV管理 > 新建ENV”,进入“新建ENV”参数配置窗口,参考下表进行参数配置。

    表3 新建ENV配置参数

    参数名称

    示例

    Name

    test-env

    Driver Memory

    1GB

    Type

    spark

    Executor Memory

    1GB

    Executor Cores

    1

    Number Executors

    1

    Queue

    -

    Description

    -

    参数配置完成后,单击“确定”创建ENV。

  1. 选择“作业管理 > 数据同步任务 > 新建作业”,在“新建作业”窗口中填写配置。单击“下一步”,进入作业参数配置页面。

    其中:

    参数名称

    示例

    Name

    job_debeziumtohudi

    Desc

    New CDL Job

  2. 配置ThirdKafka作业参数。

    1. 在作业参数配置页面,选取左侧“thirdparty-kafka”图标拖入右侧编辑区域,然后双击此图标进入ThirdpartyKafka作业参数配置窗口。参考下表进行参数配置,相关作业参数介绍请参见创建CDL数据同步任务作业
      表4 thirdparty-kafka作业参数

      参数名称

      示例

      Link

      debezium_link

      Datastore Type

      debezium-json

      Source Topics

      source_topic

      Tasks Max

      1

      Tolerance

      none

      Multi Partition

      Topic Table Mapping

      test/hudi_topic

    2. 单击“确定”,ThirdpartyKafka作业参数配置完成。

  1. 配置Hudi作业参数。

    1. 在作业参数配置页面,选取左侧Sink区域的“hudi”图标拖入右侧编辑区域,然后双击此图标进入Hudi作业参数配置窗口。参考下表进行参数配置,相关作业参数介绍请参见创建CDL数据同步任务作业
      表5 Sink Hudi作业参数

      参数名称

      示例

      Link

      hudilink

      Path

      /cdl/test

      Interval

      10

      Max Rate Per Partition

      0

      Parallelism

      10

      Target Hive Database

      default

      Hudi表属性配置方式

      可视化视图

      Hudi表属性全局配置

      -

      Hudi表属性配置-Table Name

      test

      Hudi表属性配置-Table Type Opt Key

      COPY_ON_WRITE

      Hudi表属性配置-Hudi TableName Mapping

      -

      Hudi表属性配置-Hive TableName Mapping

      -

      Hudi表属性配置-Table Primarykey Mapping

      id

      Hudi表属性配置-Table Hudi Partition Type

      -

      Hudi表属性配置-Custom Config

      -

      Execution Env

      Execution Env

    2. 单击“确定”,完成Hudi作业参数配置。

  2. 作业参数配置完成后,拖拽图标将作业进行关联,然后单击“保存”,作业配置完成。

  1. 在“作业管理”的作业列表中,找到创建的作业名称,单击操作列的“启动”,等待作业启动。

    观察数据传输是否生效,例如往Thirdparty-kafka的source_topic中发送数据库debezium json格式的事件变更数据,查看Hudi导入的文件内容。