Help Center/ DataArts Studio/ User Guide/ DataArts Migration (Real-Time Jobs)/ Tutorials/ Configuring a Job for Synchronizing Data from DMS for Kafka to Hudi
Updated on 2025-09-09 GMT+08:00

Configuring a Job for Synchronizing Data from DMS for Kafka to Hudi

Supported Source and Destination Database Versions

Table 1 Supported database versions

Source Database

Destination Database

Kafka cluster (2.7 and 3.x)

MRS cluster (3.2.0-LTS.x and 3.3.x-LTS)

Hudi (0.11.0)

Database Account Permissions

Before you use DataArts Migration for data synchronization, ensure that the source and destination database accounts meet the requirements in the following table. The required account permissions vary depending on the synchronization task type.

Table 2 Database account permissions

Type

Required Permissions

Source database connection account

When ciphertext access is enabled for Kafka, the account must have the permissions to publish and subscribe to topics. In other scenarios, there are no special permission requirements.

Destination database connection account

The MRS user must have read and write permissions for the Hadoop and Hive components. You are advised to assign the roles and user groups shown in the following figure to the MRS user.

Figure 1 Minimal permissions for MRS Hudi

For details, see MRS Cluster User Permission Model.

  • You are advised to create independent database accounts for DataArts Migration task connections to prevent task failures caused by password modification.
  • After changing the account passwords for the source or destination databases, modify the connection information in Management Center as soon as possible to prevent automatic retries after a task failure. Automatic retries will lock the database accounts.

Supported Synchronization Objects

The following table lists the objects that can be synchronized using different links in DataArts Migration.

Table 3 Synchronization objects

Type

Note

Synchronization objects

All Kafka messages can be synchronized, and message bodies in JSON or CSV format can be parsed.

Important Notes

In addition to the constraints on supported data sources and versions, connection account permissions, and synchronization objects, you also need to pay attention to the notes in the following table.

Table 4 Important notes

Type

Restriction

Database

  • MRS Kafka for which Kerberos authentication is enabled is supported.
  • Kafka instances using SASL_PLAINTEXT are supported, including the SCRAM-SHA-512 and PLAIN authentication mechanisms.
  • During data synchronization, SASL_SSL authentication can be enabled only for DMS for Kafka that uses the default certificate. It cannot be enabled for other types of Kafka.
  • The names of the destination Hudi databases, tables, and fields can only contain digits, letters, and underscores (_). Field names must start with a letter or an underscore (). You are advised to use common characters in names.

Usage

General:

  • During real-time synchronization, the IP addresses, ports, accounts, and passwords cannot be changed.
  • If a Hudi table uses bucket indexes, the partition key cannot be updated. Otherwise, duplicate data may be generated.
  • If a Hudi table uses bucket indexes, ensure that the primary key is unique in a single partition.

Incremental synchronization phase:

After incremental data is synchronized to Hudi MOR tables in MRS clusters earlier than 3.2.0-LTS1.5, CDM or Spark SQL cannot be used to write data. You need to perform compaction before writing data.

Troubleshooting:

If any problem occurs during task creation, startup, full synchronization, incremental synchronization, or completion, rectify the fault by referring to FAQs.

Other

N/A

Procedure

This section uses real-time synchronization from Kafka to MRS Hudi as an example to describe how to configure a real-time data migration job. Before that, ensure that you have read the instructions described in Check Before Use and completed all the preparations.

  1. Create a real-time migration job by following the instructions in Creating a Real-Time Migration Job and go to the job configuration page.
  2. Select the data connection type. Select Kafka for Source and Hudi for Destination.

    Figure 2 Selecting the data connection type

  3. Select a job type. The default migration type is Real-time. The migration scenario is Single table.

    Figure 3 Setting the job type

    For details about synchronization scenarios, see Synchronization Scenarios.

  4. Configure network resources. Select the created DMS for Kafka and MRS Hudi data connections and the migration resource group for which the network connection has been configured.

    Figure 4 Selecting data connections and a migration resource group

    If no data connection is available, click Create to go to the Manage Data Connections page of the Management Center console and click Create Data Connection to create a connection. For details, see Configuring DataArts Studio Data Connection Parameters.

    If no migration resource group is available, click Create to create one. For details, see Buying a DataArts Migration Resource Group Incremental Package.

  5. Check the network connectivity. After the data connections and migration resource group are configured, perform the following operations to check the connectivity between the data sources and the migration resource group.

  6. Configure source parameters.

    • Topic
      Enter a Kafka topic to be migrated.
      Figure 5 Entering a Kafka topic
    • Data Format
      Format of the message content in the source Kafka topic. DataArts Migration can process the following types of messages:
      • JSON: Messages can be parsed in JSON format.
      • CSV: Messages can be parsed using specified separators in CSV format.
      • TEXT: The entire message is synchronized as text.
      • DEBEZIUM_JSON: CDC JSON data in Debezium format can be parsed and synchronized to the destination table.
      • CANAL_JSON: CDC JSON data in Canal format can be parsed and synchronized to the destination table.
    • Consumer Group ID

      A consumer subscribes to a topic. A consumer group consists of one or more consumers. DataArts Migration allows you to specify the Kafka consumer group to which a consumption action belongs.

    • Source Kafka Attributes

      You can set Kafka attributes and add the properties. prefix. The job will automatically remove the prefix and transfer the attributes to the Kafka client. For details about the parameters, see the configuration descriptions in the Kafka documentation.

    • Upstream Data Source Type

      This parameter is mandatory only when Data Format is set to DEBEZIUM_JSON or CANAL_JSON. Currently, only TiDB is supported, indicating that CDC JSON data is generated by TiDB.

    • Database Table Names

      This parameter is mandatory only when Data Format is set to DEBEZIUM_JSON or CANAL_JSON. It indicates the database tables that Kafka messages come from. The task parses CDC JSON data and synchronizes only the data for which database tables have been configured.

  7. Configure destination parameters.

    Figure 6 Hudi destination parameters
    • Basic Configuration

      Set the database to which the destination Hudi table belongs.

    • Destination Table

      Set the name of the destination Hudi table.

    • Data Storage Path

      Warehouse path when tables are automatically created in Hudi. A subdirectory is created in the warehouse path for each table. You can enter an HDFS or OBS path. The path format is as follows:

      • OBS path: obs://bucket/warehouse
      • HDFS path: /tmp/warehouse
    • Global Hudi Table Attribute Configuration

      Some advanced functions can be configured using parameters. For details, see Hudi advanced parameters.

      Table 5 Hudi advanced parameters

      Parameter

      Type

      Default Value

      Unit

      Description

      write.precombine.field

      string

      -

      -

      Pre-aggregation key of the Hudi table, which is used to combine records when data is written. It is usually used for update and deletion operations. This parameter is mandatory. Enter write.precombine.field and set it to a field in the Hudi table.

      index.type

      string

      BLOOM

      N/A

      Index type of the Hudi table

      BLOOM and BUCKET indexes are supported. If a large amount of data need to be migrated, BUCKET indexes are recommended for better performance.

      hoodie.bucket.index.num.buckets

      int

      256

      Count

      Number of buckets within a Hudi table partition

      NOTE:

      When using Hudi BUCKET tables, you need to set the number of buckets for a table partition. The number of buckets affects the table performance.

      • Number of buckets for a non-partitioned table = MAX(CEIL(Data volume of a single table (GB)/1 GB), 4)
      • Number of buckets for a partitioned table = MAX(CEIL(Data volume of a single partition (GB)/1 GB), 1)

      Pay attention to the following:

      • The total data volume of a table, instead of the compressed size, is used.
      • An even number of buckets is recommended. The minimum number of buckets should be 4 for a non-partitioned table and 1 for a partitioned table.

      changelog.enabled

      boolean

      false

      N/A

      Whether to enable the Hudi ChangeLog function. If this function is enabled, the migration job can output DELETE and UPDATE BEFORE data.

      logical.delete.enabled

      boolean

      true

      N/A

      Whether to enable logical deletion. If the ChangeLog function is enabled, logical deletion must be disabled.

      hoodie.write.liststatus.optimized

      boolean

      true

      N/A

      Whether to enable liststatus optimization when log files are written. If the migration job involves large tables or a large amount of partitioned data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.

      hoodie.index.liststatus.optimized

      boolean

      false

      N/A

      Whether to enable liststatus optimization during data locating. If the migration job involves large tables or a large amount of partitioned data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.

      compaction.async.enabled

      boolean

      true

      N/A

      Whether to enable asynchronous compaction. The compaction operation affects the writing performance of real-time jobs. If you use an external compaction operation, you can set this parameter to false to disable compaction for real-time processing migration jobs.

      compaction.schedule.enabled

      boolean

      true

      N/A

      Whether to generate compaction plans. Compaction plans must be generated by this service and can be executed by Spark.

      compaction.delta_commits

      int

      5

      Count

      Compaction request generation frequency Lowering the compaction request generation frequency reduces the compaction frequency and improves job performance. If there is a small volume of incremental data to be synchronized to Hudi, you can set a larger value for this parameter.

      NOTE:

      For example, if this parameter is set to 40, a compaction request is generated for every 40 commits. Since DataArts Migration generates a commit every minute, the interval between compaction requests is 40 minutes.

      clean.async.enabled

      boolean

      true

      N/A

      Whether to clear data files of historical versions

      clean.retain_commits

      int

      30

      Count

      Number of commits to retain Data files related to these commits will be retained for a period calculated by multiplying the number of specified commits by the interval between commits. You are advised to set this parameter to twice the value of compaction.delta_commits.

      NOTE:

      For example, if this parameter is set to 80 and since DataArts Migration generates a commit every minute, data files related to commits generated 80 minutes earlier are cleaned, and data files related to the recent 80 commits are retained.

      hoodie.archive.automatic

      boolean

      true

      N/A

      Whether to age Hudi commit files

      archive.min_commits

      int

      40

      Count

      Number of recent commits to keep when historical commits are archived to log files You are advised to set this parameter to one greater than clean.retain_commits.

      NOTE:

      For example, if this parameter is set to 81, the files related to the recent 81 commits are retained when an archive operation is triggered.

      archive.max_commits

      int

      50

      Count

      Number of commits that triggers an archive operation You are advised to set this parameter to 20 greater than archive.min_commits.

      NOTE:

      For example, if the parameter is set to 101, an archive operation is triggered when the files of 101 commits are generated.

      • To achieve optimal performance for the migration job, you are advised to use an MOR table that uses Hudi BUCKET indexes and configure the number of buckets based on the actual data volume.
      • To ensure the stability of the migration job, you are advised to split the Hudi Compaction job into Spark jobs and execute them by MRS, and enable compaction plans to be generated for this migration job. For details, see How Do I Configure a Spark Periodic Task for Hudi Compaction?

  8. Refresh and check the mapping between the source and destination tables. In addition, you can modify table attributes, add additional fields, and use the automatic table creation capability to create tables in the destination Hudi database.

    If the source is CDC JSON (DEBEZIUM_JSON or CANAL_JSON) data, automatic table creation is not supported. You must manually create a Hudi table.
    Figure 7 Mapping between source and destination tables
    • Synchronization Primary Key

      The primary key must be set for Hudi tables. If the source table has no primary key, you must manually select the primary key during field mapping.

    • Partition Field

      If no Hudi table is available at the destination, you can select a maximum of five mapped destination fields as partition fields. DataArts Migration automatically specifies partitions for a Hudi table when creating the table, and directories are automatically generated for the partitions when data is written to the Hudi table.

      The field selection sequence affects the partition level. For example, if par1 and par2 are selected as partition fields, par1 is a level-1 partition and par2 is a level-2 partition. A maximum of five levels of partitions are supported.

    • Edit Destination Field
      DataArts Migration automatically parses source messages based on the selected source message format and generates corresponding fields. You can customize the names, types, and values of the fields.
      • Field Name: name of the new field in the destination Hudi table
      • Field Type: Type of the new field in the destination Hudi table
      • Field Value: Value source of the new field in the destination Hudi table
        Table 6 Destination field value obtaining mode

        Type

        Example

        Manually assigned value

        Any character

        Built-in variable

        Kafka metadata, including six fields: __key__, __value__, __Topic__, __partition__, __offset__, and __timestamp__.

        Field variable

        Any field parsed from the source Kafka topic message

        UDF

        Flink built-in function used to transform data. The following are examples:

        • CONCAT(CAST(NOW() as STRING), `col_name`)
        • DATE_FORMAT(NOW(), 'yy')

          The field name must be enclosed in backquotes. For details about the built-in functions of Flink, see the official Flink documentation.

  9. Configure task parameters.

    Table 7 Task parameters

    Parameter

    Description

    Default Value

    Execution Memory

    Memory allocated for job execution, which automatically changes with the number of CPU cores

    8GB

    CPU Cores

    Value range: 2 to 32

    For each CPU core added, 4 GB execution memory and one concurrency are automatically added.

    2

    Maximum Concurrent Requests

    Maximum number of jobs that can be concurrently executed. This parameter does not need to be configured and automatically changes with the number of CPU cores.

    1

    Auto Retry

    Whether to enable automatic retry upon a job failure

    No

    Maximum Retries

    This parameter is displayed when Auto Retry is set to Yes.

    1

    Retry Interval (Seconds)

    This parameter is displayed when Auto Retry is set to Yes.

    120s

    Write Dirty Data

    Whether to record dirty data. By default, dirty data is not recorded. If there is a large amount of dirty data, the synchronization speed of the task is affected.

    • No: Dirty data is not recorded. This is the default value.

      Dirty data is not allowed. If dirty data is generated during the synchronization, the task fails and exits.

    • Yes: Dirty data is allowed, that is, dirty data does not affect task execution.
      When dirty data is allowed and its threshold is set:
      • If the generated dirty data is within the threshold, the synchronization task ignores the dirty data (that is, the dirty data is not written to the destination) and is executed normally.
      • If the generated dirty data exceeds the threshold, the synchronization task fails and exits.
        NOTE:

        Criteria for determining dirty data: Dirty data is meaningless to services, is in an invalid format, or is generated when the synchronization task encounters an error. If an exception occurs when a piece of data is written to the destination, this piece of data is dirty data. Therefore, data that fails to be written is classified as dirty data.

        For example, if data of the VARCHAR type at the source is written to a destination column of the INT type, dirty data cannot be written to the migration destination due to improper conversion. When configuring a synchronization task, you can configure whether to write dirty data during the synchronization and configure the number of dirty data records (maximum number of error records allowed in a single partition) to ensure task running. That is, when the number of dirty data records exceeds the threshold, the task fails and exits.

    No

    Dirty Data Policy

    This parameter is displayed when Write Dirty Data is set to Yes. The following policies are supported:

    • Do not archive: Dirty data is only recorded in job logs, but not stored.
    • Archive to OBS: Dirty data is stored in OBS and printed in job logs.

    Do not archive

    Write Dirty Data Link

    This parameter is displayed when Dirty Data Policy is set to Archive to OBS.

    Only links to OBS support dirty data writes.

    N/A

    Dirty Data Directory

    OBS directory to which dirty data will be written

    N/A

    Dirty Data Threshold

    This parameter is only displayed when Write Dirty Data is set to Yes.

    You can set the dirty data threshold as required.

    NOTE:
    • The dirty data threshold takes effect for each concurrency. For example, if the threshold is 100 and the concurrency is 3, the maximum number of dirty data records allowed by the job is 300.
    • Value -1 indicates that the number of dirty data records is not limited.

    100

    Custom attributes

    You can add custom attributes to modify some job parameters and enable some advanced functions. For details, see Job Performance Optimization.

    N/A

  10. Submit and run the job.

    After configuring the job, click Submit in the upper left corner to submit the job.

    Figure 8 Submitting the job

    After submitting the job, click Start on the job development page. In the displayed dialog box, set required parameters and click OK.

    Figure 9 Starting the job
    Table 8 Parameters for starting the job

    Parameter

    Description

    Offset Parameter

    • Earliest: Data consumption starts from the earliest offset of the Kafka topic.
    • Latest: Data consumption starts from the latest offset of the Kafka topic.
    • Time Range: Data consumption starts from the offset of the Kafka topic obtained based on the time.

    Minimum Timestamp

    This parameter is required if Offset Parameter is set to Time Range. It specifies the start time of synchronization.

    NOTE:

    If you set a time earlier than the earliest offset of Kafka messages, data consumption starts from the earliest offset by default.

  11. Monitor the job.

    On the job development page, click Monitor to go to the Job Monitoring page. You can view the status and log of the job, and configure alarm rules for the job. For details, see Real-Time Migration Job O&M.

    Figure 10 Monitoring the job

Performance Optimization

If the synchronization speed is too slow, rectify the fault by referring to Job Performance Optimization.