更新时间:2024-11-29 GMT+08:00
从PgSQL同步数据到Kafka
操作场景
本章节指导用户通过开启Kerberos认证的集群的CDLService WebUI界面从PgSQL导入数据到Kafka。
前提条件
- 集群已安装CDL、Kafka服务且运行正常。
- PostgreSQL数据库需要修改预写日志的策略,操作步骤请参考PostgreSQL数据库修改预写日志的策略。
- 在FusionInsight Manager中创建一个人机用户,例如“cdluser”,加入用户组cdladmin、hadoop、kafka,主组选择“cdladmin”组,关联角色“System_administrator”。
操作步骤
- 使用cdluser用户登录FusionInsight Manager(首次登录需要修改密码),选择“集群 > 服务 > CDL”,在CDL“概览”界面单击“CDLService UI”右侧的超链接,进入CDL原生界面。
- 选择“连接管理 > 新增连接”,进入“新增连接”参数配置窗口,参考下表,分别新增“pgsql”和“kafka”连接,相关数据连接参数介绍请参见创建数据库连接。
表1 PgSQL数据连接配置参数 参数名称
示例
Link Type
pgsql
Name
pgsqllink
Host
10.10.10.10
Port
5432
DB Name
testDB
User
user
Password
user用户密码
Description
-
表2 Kafka数据连接配置参数 参数名称
示例
Link Type
kafka
Name
kafkalink
Description
-
- 参数配置完成后,单击“测试连接”,检查数据连通是否正常。
连接校验通过后,单击“确定”完成数据连接创建。
- 选择“作业管理 > 数据同步任务 > 新建作业”,在“新建作业”窗口中填写配置。单击“下一步”,进入作业参数配置页面。
其中:
参数名称
示例
Name
job_pgsqltokafka
Desc
xxx
- 配置PgSQL作业参数。
- 在作业参数配置页面,选取左侧“pgsql”图标拖入右侧编辑区域,然后双击此图标进入PgSQL作业参数配置窗口,相关作业参数介绍请参见创建CDL数据同步任务作业。
表3 PgSQL作业参数 参数名称
示例
Link
pgsqllink
Tasks Max
1
Mode
insert、update、delete
Schema
public
dbName Alias
cdc
Slot Name
test_slot
Slot Drop
否
Connect With Hudi
否
Use Exist Publication
是
Kafka Message Format
CDL Json
Publication Name
test
- 单击“+”按钮展开更多选项。
- “WhiteList”:输入数据库中的表(如myclass)
- “Topic Table Mapping”:第一个框输入表名(例如“test”)。 第二个框输入Topic名(例如“test_topic”,该值与第一个框的表名只能是一对一的关系)。第三个框输入数据过滤时间,用于过滤源端数据。
- 单击“确定”,PgSQL作业参数配置完成。
- 在作业参数配置页面,选取左侧“pgsql”图标拖入右侧编辑区域,然后双击此图标进入PgSQL作业参数配置窗口,相关作业参数介绍请参见创建CDL数据同步任务作业。
- 配置Kafka作业参数。
- 在作业参数配置页面,选取左侧“kafka”图标拖入右侧编辑区域,然后双击此图标进入Kafka作业参数配置窗口。参考表4进行参数配置。
- 单击“确定”,完成Kafka作业参数配置。
- 作业参数配置完成后,拖拽图标将作业进行关联,然后单击“保存”,作业配置完成。
- 在“作业管理”的作业列表中,找到创建的作业名称,单击操作列的“启动”,等待作业启动。
观察数据传输是否生效,例如在PgSQL数据库中对表进行插入数据操作,然后参考使用KafkaUI管理Topic进入KafkaUI界面查看Kafka的Topic中是否有数据生成。
父主题: 常见CDL作业示例