创建CDL数据同步任务作业
操作场景
CDLService WebUI提供可视化的作业编排页面,用户可快速创建CDL作业,实现实时数据入湖。
前提条件
开启Kerberos认证的集群需已创建具有CDL管理操作权限的用户。
操作步骤
- 使用具有CDL管理操作权限的用户或admin用户(未开启Kerberos认证的集群)登录CDLService WebUI界面,请参考登录CDLService WebUI界面。
- 选择“作业管理 > 数据同步任务 > 新建作业”,在弹出的窗口中输入作业相关信息,然后单击“下一步”。
参数名称
描述
示例
Name
作业名称。
job_pgsqltokafka
Desc
描述信息。
xxx
- 在“作业管理”界面,根据业务数据流向,从界面左侧列表中分别选择“Source”和“Sink”中数据连接元素并将其拖到右侧的操作界面中。
MRS 3.2.0版本:
MRS 3.3.0及之后版本:
双击数据连接元素,并配置对应参数。
如需删除数据连接元素,选择该目标并单击界面右下角的“删除”。
表1 MySQL作业参数 参数名称
描述
示例
Link
已创建的MySQL连接。
mysqllink
Tasks Max
允许Connector创建的最大Task的数量,数据库类型的Connector只允许配置为1。
1
Mode
任务需要抓取的CDC事件类型。
- insert:插入操作
- update:更新操作
- delete:删除操作
insert、update、delete
DB Name
MySQL数据库名称。
cdl-test
Schema Auto Create
是否在启动任务时抓取表的Schema信息。
否
Connect With Hudi
是否对接Hudi。
是
DBZ Snapshot Locking Mode
任务启动执行快照时的锁模式。
none
WhiteList
待抓取表的白名单。
配置需要抓取的表的名单列表,多个表可以用英文逗号分隔,支持通配符。
可选参数,单击显示该参数。
testtable
BlackList
表的黑名单。
配置不需要抓取的表的名单列表,多个表可以用英文逗号分隔,支持通配符。
可选参数,单击显示该参数。
-
Multi Partition
是否开启Topic的多分区。
开启之后需要配置“Topic TableMapping”并指定Topic的分区数量, 单表数据将分散在多个分区中。
可选参数,单击显示该参数。
说明:- 此配置项为高危配置, 开启后无法保证数据的时间顺序。
- 默认分区数为“5”,如果需修改则需登录FusionInsight Manager,选择“集群 > 服务 > CDL > 配置”,在搜索框中搜索“topics.max.partitions”并修改该值为需要修改的分区数,例如,修改值为“10”,保存配置并重启CDL服务。
- MRS 3.3.0及之后版本,当源端表为分区表且该参数为否时,CDL创建的Topic分区表数量为源端表分区数量+1。
否
Topic Table Mapping
Topic与表的映射关系。
用于指定某个表的数据发送到指定的Topic中,开启多分区功能后需要配置Topic的分区数,分区数必须大于1。MRS 3.3.0及之后版本,数据过滤时间用于过滤数据,当源端数据的时间小于设定时间时,该数据将会被丢弃,当源端数据的时间大于设定时间时,该数据发送到下游。
单击显示该参数。如果“Connect With Hudi”选择“是”,则该参数为必填项。
testtable
testtable_topic
2023/03/10 11:33:37(MRS 3.3.0及之后版本支持)
表2 PgSQL作业参数 参数名称
描述
示例
Link
已创建的PgSQL连接。
pgsqllink
Tasks Max
允许Connector创建的最大Task的数量,数据库类型的Connector只允许配置为1。
1
Mode
任务需要抓取的CDC事件类型。
- insert:插入操作
- update:更新操作
- delete:删除操作
insert、update、delete
dbName Alias
数据库名称。
test
Schema
待连接的数据库的Schema名称。
public
Slot Name
PgSQL逻辑复制槽的名称。
不同任务之间槽名不能重名,支持小写字母和下划线。
test_solt
Enable FailOver Slot
开启Failover Slot功能,将指定为Failover Slot的逻辑复制槽信息从主实例同步到备实例,当主备切换之后逻辑订阅能够继续进行,实现逻辑复制槽的故障转移。
否
Slot Drop
任务停止时是否删除Slot。
否
Connect With Hudi
是否对接Hudi。
是
Use Exist Publication
使用已创建的publication。
是
Publication Name
已创建的publication名称。
“Use Exist Publication”选择“是”时显示该参数。
test
Start Time(MRS 3.2.0版本)
同步表的起始时间。
2022/03/16 11:33:37
Data Filter Time(MRS 3.3.0及之后版本)
数据过滤的起始时间。
2022/03/16 11:33:37
WhiteList
待抓取表的白名单。
配置需要抓取的表的名单列表,多个表可以用英文逗号分隔,支持通配符。
可选参数,单击显示该参数。
testtable
BlackList
表的黑名单。
配置不需要抓取的表的名单列表,多个表可以用英文逗号分隔,支持通配符。
可选参数,单击显示该参数。
-
Start Position
任务抓取数据的起始LSN位置。
仅MRS 3.2.0版本支持。
-
Start Txid
任务抓取数据的起始TXID位置。
仅MRS 3.2.0版本支持。
-
Multi Partition
是否开启Topic的多分区。
开启之后需要配置“Topic TableMapping”并指定Topic的分区数量, 单表数据将分散在多个分区中。
可选参数,单击显示该参数。
说明:- 此配置项为高危配置, 开启后无法保证数据的时间顺序。
- 默认分区数为“5”,如果需修改则需登录FusionInsight Manager,选择“集群 > 服务 > CDL > 配置”,在搜索框中搜索“topics.max.partitions”并修改该值为需要修改的分区数,例如,修改值为“10”,保存配置并重启CDL服务。
- MRS 3.3.0及之后版本,当源端表为分区表且该参数为否时,CDL创建的Topic分区表数量为源端表分区数量+1。
否
Topic Table Mapping
Topic与表的映射关系。
用于指定某个表的数据发送到指定的Topic中,开启多分区功能后需要配置Topic的分区数,分区数必须大于1。MRS 3.3.0及之后版本,数据过滤时间用于过滤数据,当源端数据的时间小于设定时间时,该数据将会被丢弃,当源端数据的时间大于设定时间时,该数据发送到下游。
单击显示该参数。如果“Connect With Hudi”选择“是”,则该参数为必填项。
testtable
testtable_topic
2023/03/10 11:33:37(MRS 3.3.0及之后版本)
表3 Source Hudi作业参数(MRS 3.2.0版本) 参数名称
描述
示例
Link
Hudi App使用的Link。
hudilink
Interval
同步Hudi表的时间间隔,单位:秒。
10
Start Time
同步表的起始时间。
2022/03/16 11:40:52
Max Commit Number
单次增量视图拉取Commit的最大数量。
10
Hudi Custom Config
Hudi相关的自定义配置。
-
Table Info
同步表的详细配置信息。要求Hudi与DWS的表名一致,且字段类型相同。
{"table1":[{"source.database":"base1","source.tablename":"table1"}],"table2":[{"source.database":"base2","source.tablename":"table2"}],"table3":[{"source.database":"base3","source.tablename":"table3"}]}
Execution Env
Hudi App运行时需要的环境变量,当前如果无可用的ENV,则需先手动创建ENV。
defaultEnv
表4 Source Hudi作业参数(MRS 3.3.0及之后版本) 参数名称
描述
示例
Link
Hudi App使用的Link。
hudilink
Interval
同步Hudi表的时间间隔,单位:秒。
10
Data Filter Time
数据过滤时间。
2023/08/16 11:40:52
Max Commit Number
单次增量视图拉取Commit的最大数量。
10
Hudi表属性配置方式
Hudi表属性配置方式,包括:
- 可视化视图。
- JSON视图。
可视化视图
Hudi Custom Config
Hudi相关的自定义配置。
-
Table Info
同步表的详细配置信息。要求Hudi与DWS的表名一致,且字段类型相同。
{"table1":[{"source.database":"base1","source.tablename":"table1"}],"table2":[{"source.database":"base2","source.tablename":"table2"}],"table3":[{"source.database":"base3","source.tablename":"table3"}]}
Table Info-Section Name
单表标签名称,仅支持数字、字母、下划线。
-
Table Info-Source DataBase
需要同步的Hudi数据库名称。
base1
Table Info-Source TableName
需要同步的Hudi表名。
table1
Table Info-Target SchemaName
数据写入目标数据库的Schema名称。
-
Table Info-Target TableName
数据写入目标数据库的表名称。
-
Table Info-Enable Sink Precombine
目标数据库是否启用预合并,当前仅支持目标库为DWS时启用预合并功能。
该功能用于当新值预合并字段比目标端预合并字段大时,则覆盖目标端已有数据;当新值预合并字段比目标端预合并字段小时,则丢弃新数据。
是
Table Info-Custom Config
Hudi自定义配置。
-
Execution Env
Hudi App运行时需要的环境变量,当前如果无可用的ENV,则需先手动创建ENV。
defaultEnv
表5 Source Kafka作业参数(仅适用于MRS 3.2.0版本) 参数名称
描述
示例
Link
已创建的kafka连接。
kafkalink
表6 thirdparty-kafka作业参数 参数名称
描述
示例
Link
已创建的thirdparty-kafka连接。
thirdparty-kafkalink
DB Name
待连接的数据库名称,名称只能由英文字母、数字、下划线和中划线组成,且必须以英文字母开头。
opengaussdb
Schema
待检测数据库的Schema名称。
oprngaussschema
Datastore Type
上层源的类型。
- MRS 3.2.0版本:
- opengauss
- ogg
- oracle
- drs-avro-oracle
- MRS 3.3.0及之后版本:
- drs-opengauss-json
- ogg-oracle-avro
- drs-oracle-json
- drs-oracle-avro
- opengauss
- drs-opengauss-json
Avro Schema Topic
Ogg Kafka使用的Schema Topic以JSON格式存储表的Schema。
说明:“Datastore Type”为“ogg”(MRS 3.3.0及之后版本为“ogg-oracle-avro”)显示该参数。
ogg_topic
Source Topics
源端Topic可以包含英文字母、数字、特殊字符(-,_),各Topic应该以英文逗号分隔。
topic1
Tasks Max
允许Connector创建的最大Task的数量,数据库类型的Connector只允许配置为1。
10
Tolerance
容灾策略。
- none表示低容错,即出错后导致Connector任务失败。
- all表示高容错,出错后忽略失败的记录并继续运行。
all
Start Time(MRS 3.2.0版本)
同步表的起始时间。
2022/03/16 14:14:50
Data Filter Time(MRS 3.3.0及之后版本)
数据过滤的起始时间。
2022/03/16 14:14:50
Kaka Message Format
Kafka中的消息格式,包括:
- Debezium Json
- CDL Json
说明:- 仅MRS 3.3.0及之后版本支持该参数。
- 仅“Datastore Type”为“drs-opengauss-json”时支持该参数。
CDL Json
Multi Partition
是否开启Topic多分区功能,开启之后需要配置Topic TableMapping并指定Topic的分区数量, 单表数据将分散在多个分区中。其中:
- MRS 3.2.0版本,“Datastore Type”参数值为“ogg”或“drs-avro-oracle”或“oracle”时不支持开启Topic多分区功能。
- MRS 3.3.0及之后版本,当“Datastore Type”参数值为“ogg-oracle-avro”或“drs-oracle-avro”或“drs-oracle-json”时不支持开启Topic多分区功能。
说明:默认分区数为“5”,如果需修改则需登录FusionInsight Manager,选择“集群 > 服务 > CDL > 配置”,在搜索框中搜索“topics.max.partitions”并修改该值为需要修改的分区数,例如,修改值为“10”,保存配置并重启CDL服务。
否
Topic Table Mapping
Topic与表的映射关系。
用于指定某个表的数据发送到指定的Topic中,开启多分区功能后需要配置Topic的分区数,分区数必须大于1。MRS 3.3.0及之后版本,数据过滤时间用于过滤数据,当源端数据的时间小于设定时间时,该数据将会被丢弃,当源端数据的时间大于设定时间时,该数据发送到下游。
testtable
testtable_topic
2023/03/10 11:33:37(MRS 3.3.0及之后版本)
表7 opengauss作业参数(仅适用于MRS 3.3.0及之后版本) 参数名称
描述
示例
Link
已创建的opengauss连接。
opengausslink
Tasks Max
允许Connector创建的最大Task的数量,值为“1”。
1
Mode
任务需要抓取的CDC事件类型。
- insert:插入操作
- update:更新操作
- delete:删除操作
insert、update、delete
dbName Alias
数据库名称。
test
Slot Name
opengauss逻辑复制槽的名称。
不同任务之间槽名不能重名,支持小写字母和下划线。
test_solt
Slot Drop
任务停止时是否删除Slot。
否
Connect With Hudi
是否对接Hudi。
是
WhiteList
待抓取表的白名单。
配置需要抓取的表的名单列表,多个表可以用英文逗号分隔,支持通配符。
testtable
Data Filter Time
数据过滤时间。
-
Multi Partition
是否开启Topic的多分区。
开启之后需要配置“Topic TableMapping”并指定Topic的分区数量, 单表数据将分散在多个分区中。
说明:- 此配置项为高危配置, 开启后无法保证数据的时间顺序。
- 默认分区数为“5”,如果需修改则需登录FusionInsight Manager,选择“集群 > 服务 > CDL > 配置”,在搜索框中搜索“topics.max.partitions”并修改该值为需要修改的分区数,例如,修改值为“10”,保存配置并重启CDL服务。
否
Key Management Tool
密钥管理工具。当前仅支持“his_kms”密钥管理工具。
his_kms
Key Environment Information
密钥信息。仅配置了“Key Management Tool”密钥管理工具才支持该参数。
-
Custom Config
自定义解码配置。
-
Topic Table Mapping
Topic与表的映射关系,表名格式为:Schema名.表名。
用于指定某个表的数据发送到指定的Topic中,开启多分区功能后需要配置Topic的分区数,分区数必须大于1。数据过滤时间用于过滤数据,当源端数据的时间小于设定时间时,该数据将会被丢弃,当源端数据的时间大于设定时间时,该数据发送到下游。
如果“Connect With Hudi”选择“是”,则该参数为必填项。
cdlschema.testtable
testtable_topic
2023/03/10 11:33:37
表8 Sink Hudi作业参数 参数名称
描述
示例
Link
已创建的Hudi连接。
hudilink
Path
数据存储路径。
/cdldata
Interval
Spark RDD的执行周期,单位:秒。
1
Max Rate Per Partition
使用Kafka direct stram API时,从每个Kafka分区读取数据的最大速率限制,单位:个/秒, 0表示无限制。
0
Parallelism
写Hudi时的并发数。
100
Target Hive Database
目标Hive的数据库名称。
default
Hudi表属性配置方式
Hudi表属性配置方式,包括:
- 可视化视图
- JSON视图
可视化视图
Hudi表属性全局配置
Hudi侧的全局参数。
-
Hudi表属性配置
Hudi表属性配置。
-
Hudi表属性配置-Table Name/Source Table Name(MRS 3.3.0及之后版本)
源端表名。
-
Hudi表属性配置-Table Type Opt Key
Hudi表类型,包括:
- COPY_ON_WRITE
- MERGE_ON_READ
MERGE_ON_READ
Hudi表属性配置-Hudi TableName Mapping
Hudi表名称,如果不设置,则默认与源表名相同。
-
Hudi表属性配置-Hive TableName Mapping
Hudi表同步到Hive的表名映射关系,自定义表名。
-
Hudi表属性配置-Table Primarykey Mapping
Hudi表的主键对应关系。
id
Hudi表属性配置-Table Hudi Partition Type
Hudi表和分区字段映射关系,如果Hudi表采用分区表,则需要配置表名和分区字段的对应关系,包括“time”和“customized”。
time
Hudi表属性配置-Custom Config
自定义配置。
说明:MRS 3.3.0及之后版本,从MySQL同步数据到Hudi时,需要指定“hoodie.datasource.write.precombine.field”参数,如果MySQL的timestamp、datetime类型精度只支持秒级,则不建议指定timestamp、datetime类型及_hoodie_event_time字段作为precombine字段。该功能用于当新值预合并字段比Hudi端预合并字段大时,则覆盖Hudi端已有数据;当新值预合并字段比Hudi端预合并字段小时,则丢弃新数据。
-
Execution Env
Hudi App运行时需要的环境变量,当前如果无可用的ENV,则需先参考管理CDL ENV变量进行创建。
defaultEnv
表9 Sink Kafka作业参数 参数名称
描述
示例
Link
已创建的kafka连接。
kafkalink
表10 DWS作业参数 参数名称
描述
示例
Link
Connector使用的连接。
dwslink
Query Timeout
连接DWS的超时时间,单位:毫秒。
180000
Batch Size
批次写入DWS的数据量。
50
Sink Task Number
单个表写入DWS时的最大并行作业数。
-
DWS Custom Config
自定义配置。
-
表11 ClickHouse作业参数 参数名称
描述
示例
Link
Connector使用的连接。
dwslink
Query Timeout
连接ClickHouse的超时时间,单位:毫秒。
60000
Batch Size
批次写入ClickHouse的数据量。
说明:设置单批次写入ClickHouse的数据量时数值尽量大,建议取值范围为:10000~100000。
100000
- 作业参数配置完成后,拖拽图标将作业进行关联,然后单击“保存”,作业配置完成。
- 在“作业管理”的作业列表中,找到创建的作业名称,单击操作列的“启动”,等待作业启动。
观察数据传输是否生效,例如在MySQL数据库中对作业中指定的表进行插入数据操作,查看Hudi导入的文件内容是否正常。