Updated on 2024-11-29 GMT+08:00

Creating a CDL Data Synchronization Job

Scenario

The CDLService web UI provides a visualized page for users to quickly create CDL jobs and import real-time data into the data lake.

Prerequisites

  • A user with CDL management operation permissions has been created for the cluster with Kerberos authentication enabled.
  • A heartbeat topic has been manually created in advance if the ThirdKafka link uses DRS as the source.
    • When DRS synchronizes data from the openGauss database, the heartbeat topic format is openGauss database name-cdc_cdl-cdc_heartbeat.
    • When DRS synchronizes data from the Oracle database, the heartbeat topic format is fixed to null-cdc_cdl-cdc_heartbeat.

Procedure

  1. Log in to the CDLService web UI as a user with CDL management operation permissions or as user admin (for clusters with Kerberos authentication disabled) by referring to Logging In to the CDLService Web UI.
  2. Choose Job Management > Data synchronization task and click Add Job. In the displayed dialog box, set related job parameters and click Next.

    Parameter

    Description

    Example Value

    Name

    Job name

    job_pgsqltokafka

    Desc

    Job description

    xxx

  3. On the Job Management page, select and drag the target element from Source and Sink to the GUI on the right.

    Double-click the two elements to connect them and set related parameters as required.

    To delete a data connection element, click in the upper right corner of the element.

    Table 1 MySQL job parameters

    Parameter

    Description

    Example Value

    Link

    Created MySQL link

    mysqllink

    Tasks Max

    Maximum number of tasks that can be created by the connector. The value is 1.

    1

    Mode

    Type of the CDC event to be captured by the job. Value options are as follows:

    • insert
    • update
    • delete

    insert, update, and delete

    DB Name

    MySQL database name

    cdl-test

    Schema Auto Create

    Whether to create table schemas after the job is started

    No

    Connect With Hudi

    Whether to connect to Hudi

    Yes

    DBZ Snapshot Locking Mode

    Lock mode used when a task starts to execute a snapshot. Value options are as follows:

    • minimal: A global read lock is held only when the database schema and other metadata are obtained.
    • extend: A global read lock is held during the entire snapshot execution process, blocking all write operations.
    • none: No lock mode. The schema cannot be changed when a CDL task is started.

      Optional. Click to display this parameter.

    none

    WhiteList

    Whitelisted tables that will be captured.

    Separate multiple tables using commas (,). wildcards are supported.

    (Optional) This parameter is displayed when you click .

    NOTE:

    If this parameter is not set and multiple tables exist in the source MySQL database, the global topic task creates multiple partitions. The partitions are as much as tables in the source MySQL database. If this parameter is set, the maximum partitions can be created by the global topic task is topics.max.partitions. The default value of this parameter is 5. You can change the value on the CDL service configuration page.

    testtable

    BlackList

    Blacklisted tables that will not be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    (Optional) This parameter is displayed when you click .

    -

    Multi Partition

    Whether to enable multi-partition mode for topics.

    If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.

    (Optional) This parameter is displayed when you click .

    NOTE:
    • The data receiving sequence cannot be ensured. Exercise caution when setting this parameter.
    • The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
    • If the source table is a partitioned table and this parameter is set to No, the number of topic partitioned tables created using CDL is the number of source table partitions plus 1.

    No

    Enable Data Encryption

    Whether to encrypt data written to Kafka. If this parameter is Yes, configure data encryption by referring to Encrypting Data.

    No

    Key Name

    Name of the encryption key. This parameter is displayed only when Enable Data Encryption is Yes.

    test_key

    Topic Table Mapping

    Mapping between topics and tables.

    If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.

    This parameter is displayed when you click . This parameter is mandatory if Connect With Hudi is set to Yes.

    testtable

    testtable_topic

    2023/03/10 11:33:37

    Table 2 PgSQL job parameters

    Parameter

    Description

    Example Value

    Link

    Created PgSQL link.

    pgsqllink

    Tasks Max

    Maximum number of tasks that can be created by the connector. The value is 1.

    1

    Mode

    Type of the CDC event to be captured by the job. The options are as follows:

    • insert
    • update
    • delete

    insert, update, and delete

    dbName Alias

    Database name.

    test

    Schema

    Schema of the database to be connected to.

    public

    Slot Name

    Name of the PostgreSQL logical replication slot.

    The value can contain lowercase letters and underscores (_), and cannot be the same in any other job.

    test_solt

    Enable FailOver Slot

    Whether to enable the failover slot function. After it is enabled, the information about the logical replication slot specified as the failover slot is synchronized from the active instance to the standby instance. In this manner, logical subscription can continue even upon an active/standby switchover, implementing the failover of the logical replication slot.

    No

    Slot Drop

    Whether to delete the slot when a task is stopped

    No

    Connect With Hudi

    Whether to connect to Hudi.

    Yes

    Use Exist Publication

    Use a created publication

    Yes

    Publication Name

    Name of a created publication

    This parameter is available when Use Exist Publication is set to Yes.

    test

    Kafka Message Format

    Supported message formats. The options are as follows:

    • CDL Json
    • Debezium Json

    CDL Json

    Data Filter Time

    Start time of data filtering

    2022/03/16 11:33:37

    WhiteList

    Whitelisted tables that will be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    (Optional) This parameter is displayed when you click .

    testtable

    BlackList

    Blacklisted tables that will not be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    (Optional) This parameter is displayed when you click .

    -

    Multi Partition

    Whether to enable multi-partition mode for topics.

    If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.

    (Optional) This parameter is displayed when you click .

    NOTE:
    • The data receiving sequence cannot be ensured. Exercise caution when setting this parameter.
    • The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
    • If the source table is a partitioned table and this parameter is set to No, the number of topic partitioned tables created using CDL is the number of source table partitions plus 1.

    No

    Enable Data Encryption

    Whether to encrypt data written to Kafka. If this parameter is Yes, configure data encryption by referring to Encrypting Data.

    No

    Key Name

    Name of the encryption key. This parameter is displayed only when Enable Data Encryption is Yes.

    test_key

    Topic Table Mapping

    Mapping between topics and tables.

    If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.

    This parameter is displayed when you click . This parameter is mandatory if Connect With Hudi is set to Yes.

    testtable

    testtable_topic

    2023/03/10 11:33:37

    Table 3 Oracle job parameters

    Parameter

    Description

    Example Value

    Link

    Created Oracle link.

    oraclelink

    Tasks Max

    Maximum number of tasks that can be created by the connector. The value is 1.

    1

    Mode

    Type of the CDC event to be captured by the job. The options are as follows:

    • insert
    • update
    • delete

    insert, update, and delete

    dbName Alias

    Name of the database to be connected to.

    oracledb

    Schema

    Schema of the database to be connected to.

    oracleschema

    Connect With Hudi

    Whether to connect to Hudi.

    Yes

    DB Fetch Size

    Maximum number of records fetched from the database once.

    The value must be no less than 0. Value 0 means that the number is not limited.

    (Optional) This parameter is displayed when you click .

    -

    WhiteList

    Whitelisted tables that will be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    (Optional) This parameter is displayed when you click .

    testtable

    BlackList

    Blacklisted tables that will not be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    (Optional) This parameter is displayed when you click .

    -

    Multi Partition

    Whether to enable multi-partition mode for topics.

    If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.

    (Optional) This parameter is displayed when you click .

    NOTE:
    • The data receiving sequence cannot be ensured. Exercise caution when setting this parameter.
    • The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.
    • If the source table is a partitioned table and this parameter is set to No, the number of topic partitioned tables created using CDL is the number of source table partitions plus 1.

    No

    Enable Data Encryption

    Whether to encrypt data written to Kafka. If this parameter is Yes, configure data encryption by referring to Encrypting Data.

    No

    Key Name

    Name of the encryption key. This parameter is displayed only when Enable Data Encryption is Yes.

    test_key

    Topic Table Mapping

    Mapping between topics and tables.

    If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.

    This parameter is displayed when you click . This parameter is mandatory if Connect With Hudi or Multi Partition is set to Yes.

    testtable

    testtable_topic

    2023/03/10 11:33:37

    Table 4 Source Hudi job parameters

    Parameter

    Description

    Example Value

    Link

    Link used by the Hudi app

    hudilink

    Interval

    Interval for synchronizing the Hudi table, in seconds

    10

    Data Filter Time

    Data filtering time

    2023/08/16 11:40:52

    Max Commit Number

    Maximum number of commits that can be pulled from an incremental view at a time.

    10

    Configuring Hudi Table Attributes

    View for configuring attributes of the Hudi table. The value can be:

    • Visual View
    • JSON View

    Visual View

    Hudi Custom Config

    Customized configuration related to Hudi.

    -

    Table Info

    Detailed configuration information about the synchronization table. Hudi and GaussDB(DWS) must have the same table names and field types.

    {"table1":[{"source.database":"base1","source.tablename":"table1"}],"table2":[{"source.database":"base2","source.tablename":"table2"}],"table3":[{"source.database":"base3","source.tablename":"table3"}]}

    Table Info-Section Name

    Label name of a single table. Only digits, letters, and underscores (_) are supported.

    -

    Table Info-Source DataBase

    Name of the Hudi database to be synchronized

    base1

    Table Info-Source TableName

    Name of the Hudi table to be synchronized

    table1

    Table Info-Target SchemaName

    Schema name of the target database to which data is written

    -

    Table Info-Target TableName

    Table name of the target database to which data is written

    -

    Table Info-Enable Sink Precombine

    Whether to enable pre-combine for the target database. Currently, the pre-combine function can be enabled only when the target database is GaussDB(DWS).

    This function is used to overwrite existing data at the target when new pre-combine fields are more than the target pre-combine fields. If the new pre-combine fields are less than the target pre-combine fields, new data is discarded

    Yes

    Table Info-Custom Config

    Hudi custom configuration

    -

    Execution Env

    Environment variable required for running the Hudi App. If no ENV is available, manually create one.

    defaultEnv

    Table 5 thirdparty-kafka job parameters

    Parameter

    Description

    Example Value

    Link

    Created thirdparty-kafka link

    thirdparty-kafkalink

    DB Name

    Name of the database to be connected. The name can contain only letters, digits, underscores (_), and hyphens (-), and must start with a letter.

    NOTE:

    This parameter is unavailable when Datastore Type is set to debezium-json.

    opengaussdb

    Schema

    Schema of the database to be checked

    NOTE:

    This parameter is unavailable when Datastore Type is set to debezium-json.

    opengaussschema

    Datastore Type

    Type of the upper-layer source. Value options are as follows:

    • drs-opengauss-json
    • ogg-oracle-avro
    • drs-oracle-json
    • drs-oracle-avro
    • debezium-json

    drs-opengauss-json

    Avro Schema Topic

    Schema topic used by OGG Kafka to store table schemas in JSON format.

    NOTE:

    This parameter is available only when Datastore Type is set to ogg-oracle-avro.

    ogg_topic

    Source Topics

    Data source topics can contain letters, digits, and special characters (-,_). Topics must be separated by commas (,).

    topic1

    Tasks Max

    Maximum number of tasks that can be created by a connector. For a connector of the database type, this parameter must be set to 1.

    10

    Tolerance

    Fault tolerance policy.

    • none: indicates low tolerance and the Connector task will fail if an error occurs.
    • all: indicates high tolerance and all failed records will be ignored if an error occurs.

    all

    Data Filter Time

    End time of data filtering

    NOTE:

    This parameter is unavailable when Datastore Type is set to debezium-json.

    2022/03/16 14:14:50

    Kaka Message Format

    Supported formats. The options are as follows:

    • Debezium Json
    • CDL Json
    NOTE:

    This parameter is available only when Datastore Type is set to drs-opengauss-json.

    CDL Json

    Multi Partition

    Whether to enable multi-partitioning for topics. If it is enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.

    This function is not available if Datastore Type is set to ogg-oracle-avro, drs-oracle-avro, or drs-oracle-json.

    NOTE:

    The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.

    No

    Enable Data Encryption

    Whether to encrypt data written to Kafka. If this parameter is Yes, configure data encryption by referring to Encrypting Data.

    NOTE:

    This parameter is unavailable when Datastore Type is set to debezium-json.

    No

    Key Name

    Name of the encryption key. This parameter is displayed only when Enable Data Encryption is Yes.

    test_key

    Topic Table Mapping

    Mapping between topics and tables.

    If configured, table data can be sent to the specified topic, and this topic belongs to the CDL-dependent Kafka. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. If the Kafka to which the data source topic belongs is the same as the Kafka on which the CDL depends, the topic name in Topic Table Mapping must be different from that configured in Source Topics.

    When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream. When Datastore Type is debezium-json, the data filtering time cannot be configured.

    testtable

    testtable_topic

    2023/03/10 11:33:37

    Table 6 openGauss job parameters

    Parameter

    Description

    Example Value

    Link

    Created openGauss link

    opengausslink

    Tasks Max

    Maximum number of tasks that can be created by the connector. The value is 1.

    1

    Mode

    Type of the CDC event to be captured by the job. Value options are as follows:

    • insert
    • update
    • delete

    insert, update, and delete

    dbName Alias

    Database name.

    test

    Slot Name

    Name of the openGauss logical replication slot

    The value can contain lowercase letters and underscores (_), and cannot be the same in any other job.

    test_solt

    Slot Drop

    Whether to delete the slot when a task is stopped

    No

    Connect With Hudi

    Whether to connect to Hudi

    Yes

    WhiteList

    Whitelisted tables that will be captured.

    Separate multiple tables using commas (,). Wildcards are supported.

    testtable

    Data Filter Time

    Data filtering time

    -

    Multi Partition

    Whether to enable multi-partition mode for topics.

    If enabled, you need to set Topic Table Mapping and specify the number of topic partitions, and the data of a single table will be scattered in multiple partitions.

    NOTE:
    • The data receiving sequence cannot be ensured if this function is enabled. Exercise caution when setting this parameter.
    • The default number of partitions is 5. To change the number of partitions, log in to FusionInsight Manager, choose Cluster > Services > CDL, click Configurations, search for topics.max.partitions in the search box, and change the value to the number of partitions to be changed. For example: Change the value to 10, save the configuration, and restart the CDL service.

    No

    Kafka Message Format

    Supported message formats. The options are as follows:

    • CDL Json
    • Debezium Json

    CDL Json

    Key Management Tool

    Key management tool. Currently, only his_kms is supported.

    his_kms

    Key Environment Information

    Key information This parameter is available only when Key Management Tool is configured.

    hisIamUrl=https://iam.his-op.xxx.com/iam/auth/token, hisAccount={Account name}, hisSecret={Key management->Program key}, hisAppid={Enterprise application ID}, hisEnterprise={Enterprise ID}

    Enable Data Encryption

    Whether to encrypt data written to Kafka. If this parameter is Yes, configure data encryption by referring to Encrypting Data.

    No

    Key Name

    Name of the encryption key. This parameter is displayed only when Enable Data Encryption is Yes.

    test_key

    Custom Config

    Custom decoding configuration

    -

    Topic Table Mapping

    Mapping between topics and table. The format of the table name is Schema name.Table name.

    If configured, table data can be sent to the specified topic. If multi-partitioning is enabled, you need to set the number of partitions, which must be greater than 1. When the time of the source data is earlier than the specified data filtering time, the data is discarded. When the time of the source data is later than the specified data filtering time, the data is sent to the downstream.

    This parameter is mandatory if Connect With Hudi is set to Yes.

    cdlschema.testtable

    testtable_topic

    2023/03/10 11:33:37

    Table 7 Sink Hudi job parameters

    Parameter

    Description

    Example Value

    Link

    Created Hudi link.

    hudilink

    Path

    Path for storing data.

    /cdldata

    Interval

    Spark RDD execution interval, in seconds.

    1

    Max Rate Per Partition

    Maximum rate for reading data from each Kafka partition using the Kafka direct stream API. It is the number of records per second. 0 indicates that the rate is not limited.

    0

    Parallelism

    Parallelism for writing data to Hudi.

    100

    Target Hive Database

    Database of the target Hive

    default

    Configuring Hudi Table Attributes

    View for configuring attributes of the Hudi table. The value can be:

    • Visual View
    • JSON View

    Visual View

    Global Configuration of Hudi Table Attributes

    Global parameters on Hudi.

    -

    Configuring the Attributes of the Hudi Table

    Configuration of the Hudi table attributes.

    -

    Configuring the Attributes of the Hudi Table: Source Table Name

    Source table name

    -

    Configuring the Attributes of the Hudi Table: Table Type Opt Key

    Hudi table type. The options are as follows:

    • COPY_ON_WRITE
    • MERGE_ON_READ

    MERGE_ON_READ

    Configuring the Attributes of the Hudi Table: Hudi TableName Mapping

    Hudi table name. If this parameter is not set, the name of the Hudi table is the same as that of the source table by default.

    -

    Configuring the Attributes of the Hudi Table: Hive TableName Mapping

    Mapping between Hudi tables and Hive tables.

    -

    Configuring the Attributes of the Hudi Table: Table Primarykey Mapping

    Primary key mapping of the Hudi table

    id

    Configuring the Attributes of the Hudi Table: Table Hudi Partition Type

    Mapping between the Hudi table and partition fields. If the Hudi table uses partitioned tables, you need to configure the mapping between the table name and partition fields. The value can be time or customized.

    time

    Configuring the Attributes of the Hudi Table: Custom Config

    Custom configuration

    NOTE:

    When data is synchronized from MySQL to Hudi, you need to specify the hoodie.datasource.write.precombine.field parameter. If the timestamp and datetime of MySQL support only second-level precision, do not specify fields of timestamp or datetime type, or _hoodie_event_time as the pre-combine field. This function is used to overwrite existing data at Hudi when new pre-combine fields are more than Hudi pre-combine fields. If the new pre-combine fields are less than Hudi pre-combine fields, new data is discarded.

    -

    Execution Env

    Environment variable required for running the Hudi App. If no ENV is available, create one by referring to Managing ENV.

    defaultEnv

    Table 8 Sink Kafka job parameters

    Parameter

    Description

    Example Value

    Link

    Created Kafka link

    kafkalink

    Table 9 GaussDB(DWS) job parameters

    Parameter

    Description

    Example Value

    Link

    Link used by Connector

    dwslink

    Query Timeout

    Timeout interval for connecting to GaussDB(DWS), in milliseconds

    180000

    Batch Size

    Amount of data batch written to GaussDB(DWS)

    50

    Sink Task Number

    Maximum number of concurrent jobs when a table is written to GaussDB(DWS).

    -

    DWS Custom Config

    Custom configuration

    -

    Table 10 ClickHouse job parameters

    Parameter

    Description

    Example Value

    Link

    Link used by Connector

    clickhouselink

    Query Timeout

    Timeout interval for connecting to ClickHouse, in milliseconds

    60000

    Batch Size

    Amount of data batch written to ClickHouse

    NOTE:

    It is best practice to set this parameter to a large value. The recommended value range is 10000-100000.

    100000

  4. After the job parameters are configured, drag the two icons to associate the job parameters and click Save. The job configuration is complete.

  5. In the job list on the Job Management page, locate the created jobs, click Start in the Operation column, and wait until the jobs are started.

    Check whether the data transmission takes effect, for example, insert data into the table in the MySQL database and view the content of the file imported to Hudi.