更新时间:2024-11-29 GMT+08:00

从PgSQL同步数据到Kafka

操作场景

本章节指导用户通过开启Kerberos认证的集群的CDLService WebUI界面从PgSQL导入数据到Kafka。

前提条件

  • 集群已安装CDL、Kafka服务且运行正常。
  • PostgreSQL数据库需要修改预写日志的策略,操作步骤请参考PostgreSQL数据库修改预写日志的策略
  • 在FusionInsight Manager中创建一个人机用户,例如“cdluser”,加入用户组cdladmin、hadoop、kafka,主组选择“cdladmin”组,关联角色“System_administrator”

操作步骤

  1. 使用cdluser用户登录FusionInsight Manager(首次登录需要修改密码),选择“集群 > 服务 > CDL”,在CDL“概览”界面单击“CDLService UI”右侧的超链接,进入CDL原生界面。
  2. 选择“连接管理 > 新增连接”,进入“新增连接”参数配置窗口,参考下表,分别新增“pgsql”和“kafka”连接,相关数据连接参数介绍请参见创建数据库连接

    表1 PgSQL数据连接配置参数

    参数名称

    示例

    Link Type

    pgsql

    Name

    pgsqllink

    Host

    10.10.10.10

    Port

    5432

    DB Name

    testDB

    User

    user

    Password

    user用户密码

    Description

    -

    表2 Kafka数据连接配置参数

    参数名称

    示例

    Link Type

    kafka

    Name

    kafkalink

    Description

    -

  3. 参数配置完成后,单击“测试连接”,检查数据连通是否正常。

    连接校验通过后,单击“确定”完成数据连接创建。

  4. 选择“作业管理 > 数据同步任务 > 新建作业”,在“新建作业”窗口中填写配置。单击“下一步”,进入作业参数配置页面。

    其中:

    参数名称

    示例

    Name

    job_pgsqltokafka

    Desc

    xxx

  5. 配置PgSQL作业参数。

    1. 在作业参数配置页面,选取左侧“pgsql”图标拖入右侧编辑区域,然后双击此图标进入PgSQL作业参数配置窗口,相关作业参数介绍请参见创建CDL数据同步任务作业
      表3 PgSQL作业参数

      参数名称

      示例

      Link

      pgsqllink

      Tasks Max

      1

      Mode

      insert、update、delete

      Schema

      public

      dbName Alias

      cdc

      Slot Name

      test_slot

      Slot Drop

      Connect With Hudi

      Use Exist Publication

      Kafka Message Format

      CDL Json

      Publication Name

      test

    2. 单击“+”按钮展开更多选项。

      • “WhiteList”:输入数据库中的表(如myclass)
      • “Topic Table Mapping”:第一个框输入表名(例如“test”)。 第二个框输入Topic名(例如“test_topic”,该值与第一个框的表名只能是一对一的关系)。第三个框输入数据过滤时间,用于过滤源端数据。
    3. 单击“确定”,PgSQL作业参数配置完成。

  6. 配置Kafka作业参数。

    1. 在作业参数配置页面,选取左侧“kafka”图标拖入右侧编辑区域,然后双击此图标进入Kafka作业参数配置窗口。参考表4进行参数配置。
      表4 Kafka作业参数

      参数名称

      示例

      Link

      kafkalink

    2. 单击“确定”,完成Kafka作业参数配置。

  7. 作业参数配置完成后,拖拽图标将作业进行关联,然后单击“保存”,作业配置完成。

  8. 在“作业管理”的作业列表中,找到创建的作业名称,单击操作列的“启动”,等待作业启动。

    观察数据传输是否生效,例如在PgSQL数据库中对表进行插入数据操作,然后参考使用KafkaUI管理Topic进入KafkaUI界面查看Kafka的Topic中是否有数据生成。