Help Center/ DataArts Studio/ User Guide/ DataArts Migration (Real-Time Jobs)/ Tutorials/ Configuring a Job for Synchronizing Data from DMS for Kafka to OBS
Updated on 2025-08-05 GMT+08:00

Configuring a Job for Synchronizing Data from DMS for Kafka to OBS

Supported Source and Destination Database Versions

Table 1 Supported database versions

Source Database

Destination Database

Kafka cluster (2.7 and 3.x)

N/A

Database Account Permissions

Before you use DataArts Migration for data synchronization, ensure that the source and destination database accounts meet the requirements in the following table. The required account permissions vary depending on the synchronization task type.

Table 2 Database account permissions

Type

Required Permissions

Source database account

When ciphertext access is enabled for DMS for Kafka, the account must have the permissions to publish and subscribe to topics. In other scenarios, there are no special permission requirements.

Destination database account

You must have the permissions to access, read objects from, and write objects to the destination OBS bucket. For details, see .

  • You are advised to create independent database accounts for DataArts Migration task connections to prevent task failures caused by password modification.
  • After changing the account passwords for the source or destination databases, modify the connection information in Management Center as soon as possible to prevent automatic retries after a task failure. Automatic retries will lock the database accounts.

Supported Synchronization Objects

The following table lists the objects that can be synchronized using different links in DataArts Migration.

Table 3 Synchronization objects

Type

Note

Synchronization objects

All Kafka messages can be synchronized, and message bodies in JSON or CSV format can be parsed.

Important Notes

In addition to the constraints on supported data sources and versions, connection account permissions, and synchronization objects, you also need to pay attention to the notes in the following table.

Table 4 Important notes

Type

Restriction

Database

  • Kafka instances are supported if SASL_PLAINTEXT is enabled, including the SCRAM-SHA-512 and PLAIN authentication mechanisms.
  • Kafka instances for which SASL_SSL is enabled are not supported.

Usage

General:

During real-time synchronization, the IP addresses, ports, accounts, and passwords cannot be changed.

Incremental synchronization phase:

In the entire database migration scenario, you need to increase the number of concurrent jobs based on the number of topic partitions to be synchronized. Otherwise, a memory overflow may occur.

Troubleshooting:

If any problem occurs during task creation, startup, full synchronization, incremental synchronization, or completion, rectify the fault by referring to .

Other

  • Tables in the destination database can contain more columns than those in the source database. The following failures must be avoided:
    • Assume that extra columns in the destination database cannot be null and have no default values. If newly inserted data records are synchronized from the source database to the destination database, the extra columns will become null, which does not meet the requirements of the destination database and will cause the task to fail.
    • Assume that extra columns in the destination database must be fixed at a default value and have a unique constraint. If newly inserted data records are synchronized from the source database to the destination database, the extra columns will contain default values. That does not meet the requirements of the destination database.
  • During automatic table creation, the length of the char, varchar, nvarchar, enum, and set characters in the source database is automatically increased by byte in the destination GaussDB(DWS) database.
  • During full synchronization of timestamp data, the on update current_timestamp syntax in the default value will not be synchronized to the destination GaussDB(DWS) database.
  • Only DDL operations (for example, RENAME TABLE A TO B, in which B must be within the synchronization scope) can be performed to rename tables only if the renamed tables are within the synchronization scope.

Procedure

This section uses real-time synchronization from DMS for Kafka to OBS as an example to describe how to configure a real-time data migration job. Before that, ensure that you have read the instructions described in Check Before Use and completed all the preparations.

  1. Create a real-time migration job by following the instructions in Creating a Real-Time Migration Job and go to the job configuration page.
  2. Select the data connection type. Select DMS Kafka for Source and OBS for Destination.

    Figure 1 Selecting the data connection type

  3. Select a job type. The default migration type is Real-time. The migration scenarios include Single table and Entire DB.

    Figure 2 Setting the migration job type

    For details about synchronization scenarios, see Synchronization Scenarios.

  4. Configure network resources. Select the created DMS for Kafka and OBS data connections and the migration resource group for which the network connection has been configured.

    Figure 3 Selecting data connections and a migration resource group

    If no data connection is available, click Create to go to the Manage Data Connections page of the Management Center console and click Create Data Connection to create a connection. For details, see Configuring DataArts Studio Data Connection Parameters.

  5. Configure source parameters.

    • Select the Kafka topics to be synchronized in different migration scenarios based on the following table.
      Table 5 Selecting the Kafka topics to be synchronized

      Synchronization Scenario

      Configuration Method

      Single table

      Enter a Kafka topic to be migrated.
      Figure 4 Entering a Kafka topic

      Entire DB

      Select the Kafka topics to be migrated.
      Figure 5 Selecting Kafka topics
    • Data Format
      Format of the message content in the source Kafka topic. DataArts Migration can process the following types of messages:
      • JSON: Messages can be parsed in JSON format.
      • CSV: Messages can be parsed using specified separators in CSV format.
      • TEXT: The entire message is synchronized as text.
    • Consumer Group ID

      A consumer subscribes to a topic. A consumer group consists of one or more consumers. DataArts Migration allows you to specify the Kafka consumer group to which a consumption action belongs.

    • Source Kafka Attributes

      You can set Kafka attributes and add the properties. prefix. The job will automatically remove the prefix and transfer the attributes to the Kafka client. For details about the parameters, see the configuration descriptions in the Kafka documentation.

  6. Configure destination parameters.

    Figure 6 Configuring destination OBS parameters
    • File Storage Format

      Format of the files to be written to OBS. Parquet, SequenceFile, and TextFile are supported.

    • File Compression Mode
      Compression mode of the data to be written to OBS files. By default, data is not compressed. The following compression modes are supported:
      • Parquet format: UNCOMPRESSED or SNAPPY
      • SequenceFile format: UNCOMPRESSED, SNAPPY, GZIP, LZ4, or BZIP2.
      • TextFile format: UNCOMPRESSED
    • OBS Path

      Path for storing OBS files. You can enter the #{source_Topic_name} built-in variable so that topics at the source can be written to different paths. An example path is obs://bucket/dir/test.db/prefix_#{source_Topic_name}_suffix/.

    • Global Advanced Settings
      You can configure the parameters in the following table to enable some advanced functions.
      Table 6 OBS advanced parameters

      Parameter

      Type

      Default Value

      Unit

      Description

      auto-compaction

      boolean

      false

      N/A

      Whether to compact merge files. Data is written to a temporary file first. This parameter specifies whether to compact the generated temporary files after the checkpoint is complete. Enabling this function reduces the number of small files in some scenarios, but greatly slows down the synchronization.

  7. Refresh the mapping between the source table and destination table. Click Edit Destination Field to check the fields to be written to the destination and configure partition fields as needed.

    Figure 7 Mapping between source and destination tables
    • Partition Field

      After you configure partition fields, a partition directory is automatically generated when data is written to OBS. The directory name is in Partition field=Partition value format. In addition, the field selection sequence affects the partition level. For example, if par1 and par2 are selected as partition fields, par1 is a level-1 partition and par2 is a level-2 partition. A maximum of five levels of partitions are supported.

    • Edit Destination Field
      DataArts Migration automatically parses source messages based on the selected source message format and generates corresponding fields. You can customize the names, types, and values of the fields.
      • Field Name: name of the field to be written to the destination OBS file. The name must contain at least one letter and can contain underscores (_) and hyphens (-), but cannot contain only digits.
      • Field Type: type of the field to be written to the destination OBS file. The following types are supported: STRING, BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE, SHORT, DECIMAL, DATE, and TIMESTAMP.
      • Field Value: value source of the field to be written to the destination OBS file
        Table 7 Destination field value obtaining mode

        Type

        Value

        Manually assigned value

        Any character

        Built-in variable

        Kafka metadata, including six fields: __key__, __value__, __Topic__, __partition__, __offset__, and __timestamp__.

        Source table field

        Any field parsed from the source Kafka topic message

        NOTE:

        If the source Kafka message is in nested JSON format, this link can parse field values at different levels (including arrays whose subscript indexes start from 1).

        The following is an example of the JSON content:

        {
            "col1": "1",
            "col2": "2",
            "level1": {
                "level2": [
                    {
                        "level3": "test"
                    }
                ]
            }
        }

        You can obtain test using level1.level2[1].level3 and use it as the value of a field at the destination.

        UDF

        Flink built-in function used to transform data. The following are examples:

        • CONCAT(CAST(NOW() as STRING), `col_name`)
        • DATE_FORMAT(NOW(), 'yy')

        Note that the field name must be enclosed in backquotes. For details about the built-in functions of Flink, see the official Flink documentation.

  8. Configure task parameters.

    Table 8 Task parameters

    Parameter

    Description

    Default Value

    Execution Memory

    Memory allocated for job execution, which automatically changes with the number of CPU cores

    8GB

    CPU Cores

    Value range: 2 to 32

    For each CPU core added, 4 GB execution memory and one concurrency are automatically added.

    2

    Maximum Concurrent Requests

    Maximum number of jobs that can be concurrently executed. This parameter does not need to be configured and automatically changes with the number of CPU cores.

    1

    Adding custom attributes

    You can add custom attributes to modify some job parameters and enable some advanced functions. For details, see Job Performance Optimization.

    N/A

  9. Submit and run the job.

    After configuring the job, click Submit in the upper left corner to submit the job.

    Figure 8 Submitting the job

    After submitting the job, click Start on the job development page. In the displayed dialog box, set required parameters and click OK.

    Figure 9 Starting the job
    Table 9 Parameters for starting the job

    Parameter

    Description

    Offset Parameter

    • Earliest: Data consumption starts from the earliest offset of the Kafka topic.
    • Latest: Data consumption starts from the latest offset of the Kafka topic.
    • Start/End time: Data consumption starts from the offset of the Kafka topic obtained based on the time.

    Time

    This parameter is required if Offset is set to Start/End time. It specifies the start time of synchronization.

    NOTE:

    If you set a time earlier than the earliest offset of Kafka messages, data consumption starts from the earliest offset by default.

  10. Monitor the job.

    On the job development page, click Monitor to go to the Job Monitoring page. You can view the status and log of the job, and configure alarm rules for the job. For details, see Real-Time Migration Job O&M.

    Figure 10 Monitoring the job

Performance Optimization

If the synchronization speed is too slow, rectify the fault by referring to Job Performance Optimization.