Creating a CDL Data Synchronization Job
Scenario
The CDLService web UI provides a visualized page for users to quickly create CDL jobs and import real-time data into the data lake.
Prerequisites
A user with the CDL management permission has been created for the cluster with Kerberos authentication enabled.
Procedure
- Access the CDLService web UI as a user with the CDL management permissions or the admin user (for the cluster where Kerberos authentication is not enabled). For details, see Logging In to the CDLService WebUI.
- Choose Job Management > Data synchronization task and click Add Job. In the displayed dialog box, set related job parameters and click Next.
Parameter
Description
Example Value
Name
Job name
job_pgsqltokafka
Desc
Job description
xxx
- On the Job Management page, select and drag the target element from Source and Sink to the GUI on the right.
MRS 3.2.0:
MRS 3.3.0 and later versions:
Double-click the two elements to connect them and set related parameters as required.
To delete an element, select the element to be deleted and click Delete in the lower right corner of the page.
Table 1 MySQL job parameters Parameter
Description
Example Value
Link
Created MySQL link
mysqllink
Tasks Max
Maximum number of tasks that can be created by a connector. For a connector of the database type, this parameter must be set to 1.
1
Mode
Type of the CDC event to be captured by the job. Value options are as follows:
- insert
- update
- delete
insert, update, and delete
DB Name
MySQL database name
cdl-test
Schema Auto Create
Whether to create table schemas after the job is started
No
Connect With Hudi
Whether to connect to Hudi
Yes
DBZ Snapshot Locking Mode
Lock mode used when a task starts to execute a snapshot. Value options are as follows:
- minimal: A global read lock is held only when the database schema and other metadata are obtained.
- extend: A global read lock is held during the entire snapshot execution process, blocking all write operations.
- none: No lock mode. The schema cannot be changed when a CDL task is started.
Optional. Click to display this parameter.
none
WhiteList
Whitelisted tables to be captured.
Separate multiple tables using commas (,). Wildcards are supported.
(Optional) This parameter is displayed when you click .
testtable
BlackList
Whitelisted tables not to be captured.
Separate multiple tables using commas (,). Wildcards are supported.
(Optional) This parameter is displayed when you click .
-
Multi Partition
Whether to enable multi-partition mode for topics.
If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.
(Optional) This parameter is displayed when you click .
NOTE:- The data receiving sequence cannot be ensured. Exercise caution when setting this parameter.
- The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
- In MRS 3.3.0 and later versions, if the source table is a partitioned table and this parameter is set to No, the number of topic partitioned tables created using CDL is the number of source table partitions plus 1.
No
Topic Table Mapping
Mapping between topics and tables.
If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. In MRS 3.3.0 and later versions, when the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.
This parameter is displayed when you click . This parameter is mandatory if Connect With Hudi is set to Yes.
testtable
testtable_topic
2023/03/10 11:33:37 (supported in MRS 3.3.0 and later versions)
Table 2 PgSQL job parameters Parameter
Description
Example Value
Link
Created PgSQL link.
pgsqllink
Tasks Max
Maximum number of tasks that can be created by a connector. For a connector of the database type, this parameter must be set to 1.
1
Mode
Type of the CDC event to be captured by the job. The options are as follows:
- insert
- update
- delete
insert, update, and delete
dbName Alias
Database name.
test
Schema
Schema of the database to be connected to.
public
Slot Name
Name of the PostgreSQL logical replication slot.
The value can contain lowercase letters and underscores (_), and cannot be the same in any other job.
test_solt
Enable FailOver Slot
Whether to enable the failover slot function. After it is enabled, the information about the logical replication slot specified as the failover slot is synchronized from the active instance to the standby instance. In this manner, logical subscription can continue even upon an active/standby switchover, implementing the failover of the logical replication slot.
No
Slot Drop
Whether to delete the slot when a task is stopped
No
Connect With Hudi
Whether to connect to Hudi.
Yes
Use Exist Publication
Use a created publication
Yes
Publication Name
Name of a created publication
This parameter is available when Use Exist Publication is set to Yes.
test
Start Time (MRS 3.2.0)
Start time for synchronizing tables
2022/03/16 11:33:37
Data Filter Time (MRS 3.3.0 and later versions)
Start time of data filtering
2022/03/16 11:33:37
WhiteList
Whitelisted tables to be captured.
Separate multiple tables using commas (,). Wildcards are supported.
(Optional) This parameter is displayed when you click .
testtable
BlackList
Whitelisted tables not to be captured.
Separate multiple tables using commas (,). Wildcards are supported.
(Optional) This parameter is displayed when you click .
-
Start Position
Start LSN of the data captured by a task
-
Start Txid
Start TXID of the data captured by a task
-
Multi Partition
Whether to enable multi-partition mode for topics.
If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.
(Optional) This parameter is displayed when you click .
NOTE:- The data receiving sequence cannot be ensured. Exercise caution when setting this parameter.
- The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
- In MRS 3.3.0 and later versions, if the source table is a partitioned table and this parameter is set to No, the number of topic partitioned tables created using CDL is the number of source table partitions plus 1.
No
Topic Table Mapping
Mapping between topics and tables.
If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1.
In MRS 3.3.0 and later versions, when the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.
This parameter is displayed when you click . This parameter is mandatory if Connect With Hudi is set to Yes.
testtable
testtable_topic
Table 3 Source Hudi job parameters (MRS 3.2.0) Parameter
Description
Example Value
Link
Link used by the Hudi app
hudilink
Interval
Interval for synchronizing the Hudi table, in seconds
10
Start Time
Start time for synchronizing tables
2022/03/16 11:40:52
Max Commit Number
Maximum number of commits that can be pulled from an incremental view at a time.
10
Hudi Custom Config
Customized configuration related to Hudi.
-
Table Info
Detailed configuration information about the synchronization table. Hudi and DWS must have the same table names and field types.
{"table1":[{"source.database":"base1","source.tablename":"table1"}],"table2":[{"source.database":"base2","source.tablename":"table2"}],"table3":[{"source.database":"base3","source.tablename":"table3"}]}
Execution Env
Environment variable required for running the Hudi App. If no ENV is available, manually create one.
defaultEnv
Table 4 Source Hudi job parameters (MRS 3.3.0 and later versions) Parameter
Description
Example Value
Link
Link used by the Hudi app
hudilink
Interval
Interval for synchronizing the Hudi table, in seconds
10
Data Filter Time
Data filtering time
2023/08/16 11:40:52
Max Commit Number
Maximum number of commits that can be pulled from an incremental view at a time.
10
Configuring Hudi Table Attributes
View for configuring attributes of the Hudi table. The value can be:
- Visual View
- JSON View
Visual View
Hudi Custom Config
Customized configuration related to Hudi.
-
Table Info
Detailed configuration information about the synchronization table. Hudi and DWS must have the same table names and field types.
{"table1":[{"source.database":"base1","source.tablename":"table1"}],"table2":[{"source.database":"base2","source.tablename":"table2"}],"table3":[{"source.database":"base3","source.tablename":"table3"}]}
Table Info-Section Name
Label name of a single table. Only digits, letters, and underscores (_) are supported.
-
Table Info-Source DataBase
Name of the Hudi database to be synchronized
base1
Table Info-Source TableName
Name of the Hudi table to be synchronized
table1
Table Info-Target SchemaName
Schema name of the target database to which data is written
-
Table Info-Target TableName
Table name of the target database to which data is written
-
Table Info-Enable Sink Precombine
Whether to enable pre-combine for the target database. Currently, the pre-combine function can be enabled only when the target database is DWS.
This function is used to overwrite existing data at the target when new pre-combine fields are more than the target pre-combine fields. If the new pre-combine fields are less than the target pre-combine fields, new data is discarded
Yes
Table Info-Custom Config
Hudi custom configuration
-
Execution Env
Environment variable required for running the Hudi App. If no ENV is available, manually create one.
defaultEnv
Table 5 Source Kafka job parameters (Applicable only to MRS 3.2.0) Parameter
Description
Example Value
Link
Created Kafka link
kafkalink
Table 6 thirdparty-kafka job parameters Parameter
Description
Example Value
Link
Created thirdparty-kafka link
thirdparty-kafkalink
DB Name
Name of the database to be connected to. The name can contain only letters, digits, underscores (_), and hyphens (-), and must start with a letter.
opengaussdb
Schema
Schema of the database to be checked
oprngaussschema
Datastore Type
Type of the upper-layer source. Value options are as follows:
- MRS 3.2.0:
- opengauss
- ogg
- oracle
- drs-avro-oracle
- MRS 3.3.0 and later versions
- drs-opengauss-json
- ogg-oracle-avro
- drs-oracle-json
- drs-oracle-avro
- opengauss
- drs-opengauss-json
Avro Schema Topic
Schema topic used by OGG Kafka to store table schemas in JSON format.
NOTE:This parameter is available when Datastore Type is set to ogg.
ogg_topic
Source Topics
Source topics can contain letters, digits, and special characters (-,_). Topics must be separated by commas (,).
topic1
Tasks Max
Maximum number of tasks that can be created by a connector. For a connector of the database type, this parameter must be set to 1.
10
Tolerance
Fault tolerance policy.
- none: indicates low tolerance and the Connector task will fail if an error occurs.
- all: indicates high tolerance and all failed records will be ignored if an error occurs.
all
Start Time (MRS 3.2.0)
Start time for synchronizing tables
2022/03/16 14:14:50
Data Filter Time (MRS 3.3.0 and later versions)
Start time of data filtering
2022/03/16 11:33:37
Kaka Message Format
Supported formats. The options are as follows:
- Debezium Json
- CDL Json
NOTE:- Only MRS 3.3.0 and later versions support this parameter.
- This parameter is available only when Datastore Type is set to drs-opengauss-json.
CDL Json
Multi Partition
Whether to enable multi-partitioning for topics. If it is enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.
- In MRS 3.2.0, if Datastore Type is set to ogg or drs-avro-oracle or oracle, the topic multi-partition function cannot be enabled.
- In MRS 3.3.0 and later versions, when Datastore Type is set to ogg-oracle-avro or drs-oracle-avro or drs-oracle-json, the topic multi-partition function cannot be enabled.
NOTE:The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
No
Topic Table Mapping
Mapping between topics and tables.
If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1.
In MRS 3.3.0 and later versions, when the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.
testtable
testtable_topic
2023/03/10 11:33:37 (MRS 3.3.0 and later versions)
Table 7 opengauss job parameters (applicable only to MRS 3.3.0 and later versions) Parameter
Description
Example Value
Link
Created openGauss link
opengausslink
Tasks Max
Maximum number of tasks that can be created by the connector. The value is 1.
1
Mode
Type of the CDC event to be captured by the job. Value options are as follows:
- insert
- update
- delete
insert, update, and delete
dbName Alias
Database name.
test
Slot Name
Name of the openGauss logical replication slot
The value can contain lowercase letters and underscores (_), and cannot be the same in any other job.
test_solt
Slot Drop
Whether to delete the slot when a task is stopped
No
Connect With Hudi
Whether to connect to Hudi
Yes
WhiteList
Whitelisted tables that will be captured.
Separate multiple tables using commas (,). Wildcards are supported.
testtable
Data Filter Time
Data filtering time
-
Multi Partition
Whether to enable multi-partition mode for topics.
If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.
NOTE:- The data receiving sequence cannot be ensured if this function is enabled. Exercise caution when setting this parameter.
- The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
No
Key Management Tool
Key management tool. Currently, only his_kms is supported.
his_kms
Key Environment Information
Key information This parameter is available only when Key Management Tool is configured.
-
Custom Config
Custom decoding configuration
-
Topic Table Mapping
Mapping between topics and table. The format of the table name is Schema name.Table name.
If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.
This parameter is mandatory if Connect With Hudi is set to Yes.
cdlschema.testtable
testtable_topic
2023/03/10 11:33:37
Table 8 Sink Hudi job parameters Parameter
Description
Example Value
Link
Created Hudi link.
hudilink
Path
Path for storing data.
/cdldata
Interval
Spark RDD execution interval, in seconds.
1
Max Rate Per Partition
Maximum rate for reading data from each Kafka partition using the Kafka direct stream API. It is the number of records per second. 0 indicates that the rate is not limited.
0
Parallelism
Parallelism for writing data to Hudi.
100
Target Hive Database
Database of the target Hive
default
Configuring Hudi Table Attributes
View for configuring attributes of the Hudi table. The value can be:
- Visual View
- JSON View
Visual View
Global Configuration of Hudi Table Attributes
Global parameters on Hudi.
-
Configuring the Attributes of the Hudi Table
Configuration of the Hudi table attributes.
-
Configuring the Attributes of the Hudi Table: Table Name/Source Table Name (MRS 3.3.0 and later versions)
Source table name
-
Configuring the Attributes of the Hudi Table: Table Type Opt Key
Hudi table type. The options are as follows:
- COPY_ON_WRITE
- MERGE_ON_READ
MERGE_ON_READ
Configuring the Attributes of the Hudi Table: Hudi TableName Mapping
Hudi table name. If this parameter is not set, the name of the Hudi table is the same as that of the source table by default.
-
Configuring the Attributes of the Hudi Table: Hive TableName Mapping
Mapping between Hudi tables and Hive tables.
-
Configuring the Attributes of the Hudi Table: Table Primarykey Mapping
Primary key mapping of the Hudi table
id
Configuring the Attributes of the Hudi Table: Table Hudi Partition Type
Mapping between the Hudi table and partition fields. If the Hudi table uses partitioned tables, you need to configure the mapping between the table name and partition fields. The value can be time or customized.
time
Configuring the Attributes of the Hudi Table: Custom Config
Custom configuration
NOTE:In MRS 3.3.0 and later versions, when data is synchronized from MySQL to Hudi, you need to specify the hoodie.datasource.write.precombine.field parameter. If the timestamp and datetime of MySQL support only second-level precision, do not specify fields of timestamp or datetime type, or _hoodie_event_time as the pre-combine field. This function is used to overwrite existing data at Hudi when new pre-combine fields are more than Hudi pre-combine fields. If the new pre-combine fields are less than Hudi pre-combine fields, new data is discarded.
-
Execution Env
Environment variable required for running the Hudi App. If no ENV is available, create one by referring to Configuring CDL ENV Variables.
defaultEnv
Table 9 Sink Kafka job parameters Parameter
Description
Example Value
Link
Created Kafka link
kafkalink
Table 10 DWS job parameters Parameter
Description
Example Value
Link
Link used by Connector
dwslink
Query Timeout
Timeout interval for connecting to DWS, in milliseconds
180000
Batch Size
Amount of data batch written to DWS
50
Sink Task Number
Maximum number of concurrent jobs when a table is written to DWS.
-
DWS Custom Config
Custom configuration
-
Table 11 ClickHouse job parameters Parameter
Description
Example Value
Link
Link used by Connector
clickhouselink
Query Timeout
Timeout interval for connecting to ClickHouse, in milliseconds
60000
Batch Size
Amount of data batch written to ClickHouse
NOTE:It is best practice to set this parameter to a large value. The recommended value range is 10000-100000.
100000
- After the job parameters are configured, drag the two icons to associate the job parameters and click Save. The job configuration is complete.
- In the job list on the Job Management page, locate the created jobs, click Start in the Operation column, and wait until the jobs are started.
Check whether the data transmission takes effect, for example, insert data into the table in the MySQL database and view the content of the file imported to Hudi.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot