Help Center/ DataArts Studio/ User Guide/ DataArts Migration (Real-Time Jobs)/ Tutorials/ Configuring a Job for Synchronizing Data from MySQL to MRS Hudi
Updated on 2025-08-05 GMT+08:00

Configuring a Job for Synchronizing Data from MySQL to MRS Hudi

Supported Source and Destination Database Versions

Table 1 Supported database versions

Source

Destination

MySQL database (5.6, 5.7, and 8.x)

  • MRS cluster (3.2.0-LTS.x and 3.5.x)
  • Hudi (0.11.0)

Database Account Permissions

Before you use DataArts Migration for data synchronization, ensure that the source and destination database accounts meet the requirements in the following table. Different types of synchronization tasks require different permissions. For details, see Table 2.

Table 2 Database account permissions

Account

Required Permissions

Source database account

The source database account must have the following minimal permissions required for running SQL statements: SELECT, SHOW DATABASES, REPLICATION SLAVE and REPLICATION CLIENT.

GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Username'@'%';

Destination database account

The MRS user must have read and write permissions for the Hadoop and Hive components. You are advised to assign the roles and user groups shown in the following figure to the MRS user.

Figure 1 Minimal permissions for MRS Hudi

For details, see .

  • You are advised to create independent database accounts for DataArts Migration task connections to prevent task failures caused by password modification.
  • After changing the account passwords for the source or destination databases, modify the connection information in Management Center as soon as possible to prevent automatic retries after a task failure. Automatic retries will lock the database accounts.

Supported Synchronization Objects

Table 3 lists the objects that can be synchronized in different scenarios.

Table 3 Synchronization objects

Type

Note

Synchronization objects

  • DML operations INSERT, UPDATE, and DELETE can be synchronized.
  • The DDL operation of adding columns can be synchronized.
  • Only primary key tables can be synchronized.
  • Only MyISAM and InnoDB tables can be synchronized.
  • Views, foreign keys, stored procedures, triggers, functions, events, virtual columns, unique constraints, and unique indexes cannot be synchronized.
  • Table structures, common indexes, constraints (primary key, null, and non-null), and comments cannot be synchronized during automatic table creation.
  • Foreign keys that contain reference operations such as CASCADE, SET NULL, and SET DEFAULT cannot be synchronized. These operations will cause the update or deletion of rows in parent tables and affect records in child tables. Also, operations related to child tables are not recorded in Binlogs.

Notes

In addition to the constraints on supported data sources and versions, connection account permissions, and synchronization objects, you also need to pay attention to the notes in Table 4.

Table 4 Notes

Type

Restriction

Database

  • The names of the source databases, tables, and fields cannot contain non-ASCII characters or the following characters: .<'>/\" (You are advised to use common characters to avoid a failure.)
  • The names of the destination databases, tables, and fields can only contain digits, letters, and underscores (_). Field names must start with a letter or an underscore (). You are advised to use common characters in names.

Usage

General:

  • During real-time synchronization, the IP addresses, ports, accounts, and passwords cannot be changed.
  • The source database cannot be restored.
  • Binlogs must be enabled for the source database and the value of binlog_row_image must be full.
  • It is recommended that MySQL binlogs be retained for more than three days. Binlogs cannot be forcibly cleared.

    When a job is restored after an exception, the recorded binlog location may expire. As a result, the job fails to be restored. It is important to monitor the duration of job exceptions and the binlog retention period.

  • During real-time synchronization, the source MySQL database cannot be upgraded across major versions. Otherwise, data may become inconsistent or the synchronization task may fail (data, table structures, and keywords may cause compatibility changes after the cross-version upgrade). You are advised to create a synchronization task again if the source MySQL database is upgraded across major versions.
  • If a Hudi table uses bucket indexes, the partition key cannot be updated. Otherwise, duplicate data may be generated.
  • If a Hudi table uses bucket indexes, ensure that the primary key is unique in a single partition.
  • Every Hudi table in this task must contain three audit fields: cdc_last_update_date, logical_is_deleted, and _hoodie_event_time. The _hoodie_event_time field is used as the pre-aggregation key of the Hudi tables. If an existing table is used, these three audit fields must also be configured for it. Otherwise, the task may fail.
    • cdc_last_update_date: time when migration task processes CDC data
    • logical_is_deleted: logical deletion flag
    • _hoodie_event_time: timestamp of data in MySQL binlogs

Full synchronization phase:

  • During task startup and full data synchronization, do not perform DDL operations on the source database. Otherwise, the task may fail.
  • Any existing data in Hudi tables cannot be overwritten during full synchronization. You are advised to clear the Hudi tables in advance.

Incremental synchronization phase:

  • During incremental synchronization, DDL operations (for example, ALTER TABLE ddl_test ADD COLUMN c2 AFTER/FIRST c1;) for adding columns to a specified position are not supported. The AFTER/FIRST attribute will be deleted, which may cause column sequence inconsistency.
  • During incremental synchronization, executing non-idempotent DDL statements (for example, ALTER TABLE ddl_test ADD COLUMN c3 timestamp default now();) may cause data inconsistency.
  • During incremental synchronization, the following DDL operations can be identified: creating a table, deleting a table, adding a column, deleting a column, renaming a table, renaming a column, modifying a column type, and clearing a table. Only column adding operations can be synchronized to the destination Hudi database. You can set the processing policy for the other DDL operations to Ignore or Error.
    • In the database and table sharding scenario, you must stop service operations before renaming columns. Otherwise, data inconsistency may occur.
    • In the database and table sharding scenario, you are advised to synchronize only the DDL for adding columns. If you synchronize other DDL operations, the job may fail or data may be inconsistent because the destination table has been modified.
    • In the database and table sharding scenario, ensure that the types of columns to be added to each table are the same. Otherwise, the task may fail.
    • The column name can contain no more than 256 characters. Otherwise, the task fails.
  • Foreign keys that contain reference operations such as CASCADE, SET NULL, and SET DEFAULT cannot be synchronized. These operations will cause the update or deletion of rows in parent tables and affect records in child tables. Also, operations related to child tables are not recorded in Binlogs.
  • The position cannot be set before DDL operations that involves table structure changes, such as table modification and column addition.
  • After incremental data is synchronized to Hudi MOR tables in MRS clusters earlier than 3.2.0-LTS1.5, CDM or Spark SQL cannot be used to write data. You need to perform compaction before writing data.

Extraction from the standby database:

Before extracting data from the standby database, ensure that a value can be returned after the SHOW MASTER STATUS command is executed on the standby database node. Otherwise, data jobs may be abnormal and data may be lost.

Troubleshooting:

If any problem occurs during task creation, startup, full synchronization, incremental synchronization, or completion, rectify the fault by referring to .

Other restrictions

Tables in the destination database can contain more columns than those in the source database. The following failures must be avoided:

Assume that extra columns in the destination database cannot be null and have no default values. If newly inserted data records are synchronized from the source database to the destination database, the extra columns will become null, which does not meet the requirements of the destination database and will cause the task to fail.

Procedure

This section uses real-time synchronization from RDS for MySQL to MRS Hudi as an example to describe how to configure a real-time data migration job. Before that, ensure that you have read the instructions described in Check Before Use and completed all the preparations.

  1. Create a real-time migration job by following the instructions in Creating a Real-Time Migration Job and go to the job configuration page.
  2. Select the data connection type. Select MySQL for Source and Hudi for Destination.

    Figure 2 Selecting the data connection type

  3. Select a job type. The default migration type is Real-time. The migration scenarios include Entire DB and Database/Table partition.

    For details about synchronization scenarios, see Synchronization Scenarios.

  4. Configure network resources. Select the created MySQL and MRS Hudi data connections and the migration resource group for which the network connection has been configured.

    Figure 3 Selecting data connections and a migration resource group

    If no data connection is available, click Create to go to the Manage Data Connections page of the Management Center console and click Create Data Connection to create a connection. For details, see Configuring DataArts Studio Data Connection Parameters.

  5. Configure source parameters.

    Select the databases and tables to be synchronized based on Table 5.

    Table 5 Selecting the databases and tables to be synchronized

    Scenario

    Configuration

    Entire DB

    Select the MySQL databases and tables to be migrated.
    Figure 4 Selecting databases and tables

    Both databases and tables can be customized. You can select one database and one table, or multiple databases and tables.

    Database/Table partition

    Add a logical table.
    • Logical Table Name: Enter the name of the table to be written to Hudi.
    • Source Database Filter: You can enter a regular expression to filter all the database shards to be written to the destination Hudi aggregation table.
    • Source Table Filter: You can enter a regular expression to filter all the table shards in the source database shard to be written to the destination Hudi aggregation table.
    Figure 5 Adding a logical table

    You can click Preview in the Operation column to preview an added logical table. When you preview a logical table, the more the source tables, the longer the waiting time.

    Figure 6 Previewing the logical table

  6. Configure destination parameters.

    • Set Database and Table Matching Policy.

      For details about the matching policy between source and destination databases and tables in each synchronization scenario, see Table 6.

      Table 6 Database and table matching policy

      Scenario

      Configuration

      Entire DB

      • Database Matching Policy
        • Same name as the source database: Data will be synchronized to the Hudi database with the same name as the source MySQL database.
        • Custom: Data will be synchronized to the Hudi database you specify.
      • Table Matching Policy
        • Same name as the source table: Data will be synchronized to the Hudi table with the same name as the source MySQL table.
        • Custom: Data will be synchronized to the Hudi table you specify.
          Figure 7 Database and table matching policy in the entire database migration scenario
          NOTE:

          When you customize a matching policy, you can use built-in variables #{source_db_name} and #{source_table_name} to identify the source database name and table name. The table matching policy must contain #{source_table_name}.

      Database/Table partition

      • Destination Database Name: Data will be synchronized to the specified Hudi database.
      • Table Matching Policy: By default, the value is the same as the name of the logical table entered in the source configuration.
        Figure 8 Database and table matching policy in the sharding scenario
    • Set Hudi parameters.

      For details, see Table 7.

      Figure 9 Hudi destination parameters
      Table 7 Hudi destination parameters

      Parameter

      Default Value

      Unit

      Description

      Data Storage Path

      N/A

      N/A

      Warehouse path when tables are automatically created in Hudi. A subdirectory is created in the warehouse path for each table. You can enter an HDFS or OBS path. The path format is as follows:

      • OBS path: obs://bucket/warehouse
      • HDFS path: /tmp/warehouse

      Global Configuration of Hudi Table Attributes

      N/A

      N/A

      Some advanced functions can be configured using parameters. For details, see Hudi advanced parameters.

      Compaction Job

      N/A

      N/A

      An independent SparkSQL job. If this parameter is not specified, Flink performs compaction.

      Table 8 Hudi advanced parameters

      Parameter

      Type

      Default Value

      Unit

      Description

      index.type

      string

      BLOOM

      N/A

      Index type of the Hudi table

      BLOOM and BUCKET indexes are supported. If a large amount of data need to be migrated, BUCKET indexes are recommended for better performance.

      hoodie.bucket.index.num.buckets

      int

      256

      Count

      Number of buckets within a Hudi table partition

      NOTE:

      When using Hudi BUCKET tables, you need to set the number of buckets for a table partition. The number of buckets affects the table performance.

      • Number of buckets for a non-partitioned table = MAX(Data volume of the table (GB)/2 GB x 2, rounded up, 4)
      • Number of buckets for a partitioned table = MAX(Data volume of a partition (GB)/2 GB x 2, rounded up, 1)

      Pay attention to the following:

      • The total data volume of a table, instead of the compressed size, is used.
      • Setting an even number of buckets is recommended. The minimum number of buckets should be 4 for a non-partitioned table and 1 for a partitioned table.

      changelog.enabled

      boolean

      false

      N/A

      Whether to enable the Hudi ChangeLog function. If this function is enabled, the migration job can output DELETE and UPDATE BEFORE data.

      logical.delete.enabled

      boolean

      true

      N/A

      Whether to enable logical deletion. If the ChangeLog function is enabled, logical deletion must be disabled.

      hoodie.write.liststatus.optimized

      boolean

      true

      N/A

      Whether to enable liststatus optimization when log files are written. If the migration job involves large tables or a large amount of partition data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.

      hoodie.index.liststatus.optimized

      boolean

      false

      N/A

      Whether to enable liststatus optimization during data locating. If the migration job involves large tables or a large amount of partitioned data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.

      compaction.async.enabled

      boolean

      true

      N/A

      Whether to enable asynchronous compaction. The compaction operation affects the writing performance of real-time jobs. If you use an external compaction operation, you can set this parameter to false to disable compaction for real-time processing migration jobs.

      compaction.schedule.enabled

      boolean

      true

      N/A

      Whether to generate compaction plans. Compaction plans must be generated by this service and can be executed by Spark.

      compaction.delta_commits

      int

      5

      Count

      Frequency of generating compaction requests. Lowering the compaction request generation frequency reduces the compaction frequency and improves job performance. If there is a small volume of incremental data to be synchronized to Hudi, you can set a larger value for this parameter.

      NOTE:

      For example, if this parameter is set to 40, a compaction request is generated every 40 commits. Since DataArts Migration generates a commit every minute, the interval between compaction requests is 40 minutes.

      clean.async.enabled

      boolean

      true

      N/A

      Whether to clear data files of historical versions

      clean.retain_commits

      int

      30

      Count

      Number of recent commits to retain. Data files related to these commits will be retained for a period calculated by multiplying the number of specified commits by the interval between commits. You are advised to set this parameter to twice the value of compaction.delta_commits.

      NOTE:

      For example, if this parameter is set to 80 and since DataArts Migration generates a commit every minute, data files related to commits generated 80 minutes earlier are cleaned, and data files related to the recent 80 commits are retained.

      hoodie.archive.automatic

      boolean

      true

      N/A

      Whether to age Hudi commit files

      archive.min_commits

      int

      40

      Count

      Number of recent commits to keep when historical commits are archived to log files

      You are advised to set this parameter to one greater than clean.retain_commits.

      NOTE:

      For example, if this parameter is set to 81, the files related to the recent 81 commits are retained when an archive operation is triggered.

      archive.max_commits

      int

      50

      Count

      Number of commits that triggers an archive operation

      You are advised to set this parameter to 20 greater than archive.min_commits.

      NOTE:

      For example, if the parameter is set to 101, an archive operation is triggered when the files of 101 commits are generated.

      • To achieve optimal performance for the migration job, you are advised to use an MOR table that uses Hudi BUCKET indexes and configure the number of buckets based on the actual data volume.
      • To ensure the stability of the migration job, you are advised to split the Hudi Compaction job into Spark jobs and execute them by MRS, and enable compaction plans to be generated for this migration job. For details, see

  7. Refresh and check the mapping between the source and destination tables. In addition, you can modify table attributes, add additional fields, and use the automatic table creation capability to create tables in the destination Hudi database.

    Figure 10 Mapping between source and destination tables
    • Synchronization Primary Key

      The primary key must be set for Hudi tables. If the source table has no primary key, you must manually select the primary key during field mapping.

    • Edit Table Attribute

      Click Edit Table Attributes in the Operation column to configure Hudi table attributes, including the table type, partition type, and custom attributes.

      Figure 11 Configuring the Hudi table attributes
      • Table Type: MERGE_ON_READ or COPY_ON_WRITE
      • Partition Type: No partition, Time partition, or Custom partition
        • For Time, you need to specify a source field name and select a time conversion format.

          For example, you can specify the source field name src_col_1 and select a time conversion format, for example, day(yyyyMMdd), month(yyyyMM), or year(yyyy). During automatic table creation, a cdc_partition_key field is created in the Hudi table by default. The system formats the value of the source field (src_col_1) based on the configured time conversion format and writes the value to cdc_partition_key.

        • Custom partitions do not support timestamp fields. If you use timestamp fields, the job will fail.
      • Customize table attributes. Some advanced functions of a single table can be configured using parameters. For details about the parameters, see the table that lists Hudi advanced configurations.
    • Edit additional fields: Click Additional Field in the Operation column to add custom fields to the destination Hudi table. For a new table, you can add additional fields to the existing fields in the source table. You can customize the field name, select the field type, and enter the field value.
      • Field Name: name of the new field in the destination Hudi table
      • Field Type: Type of the new field in the destination Hudi table
      • Field Value: Value source of the new field in the destination Hudi table
        Table 9 Configuring the field value obtaining mode

        Type

        Example

        Constant

        Digits, letters, and special characters are supported. Color emoticons may cause a job submission failure.

        Built-in variable

        • Source host IP address: source.host
        • Source schema name: source.schema
        • Source table name: source.table
        • Destination schema name: target.schema
        • Destination table name: target.table

        Source table field

        Any field in the source table

        Do not change the name of the source table field when the job is running. Otherwise, the job may be abnormal.

        UDF

        • substring(#col, pos[, len]): obtains a substring of a specified length from the source column name. The substring range is [pos, pos+len).
        • date_format(#col, time_format[, src_tz, dst_tz]): formats the source column name based on a specified time format. The time zone can be converted using src_tz and dst_tz.
        • now([tz]): obtains the current time in a specified time zone.
        • if(cond_exp, str1, str2): returns str1 if the condition expression cond_exp is met and returns str2 otherwise.
        • concat(#col[, #str, ...]): concatenates multiple parameters, including source columns and strings.
        • from_unixtime(#col[, time_format]): formats a Unix timestamp based on a specified time format.
        • unix_timestamp(#col[, precision, time_format]): converts a time into a Unix timestamp of a specified time format and precision. time_format must be the same as that in the source data.
    • Automatic table creation: Click Auto Table Creation to automatically create tables in the destination database based on the configured mapping policy. After the tables are created, Existing table is displayed for them.
      Figure 12 Automatic table creation
      • DataArts Migration supports only automatic table creation. You need to manually create databases and schemas at the destination before using this function.
      • For details about the field type mapping for automatic table creation, see Field Type Mapping.
      • An automatically created Hudi table contains three audit fields: cdc_last_update_date, logical_is_deleted, and _hoodie_event_time. The _hoodie_event_time field is used as the pre-aggregation key of the Hudi table.

  8. Configure DDL message processing rules.

    Real-time migration jobs can synchronize data manipulation language (DML) operations, such as adding, deleting, and modifying data, as well as some table structure changes using the data definition language (DDL). You can set the processing policy for a DDL operation to Normal processing, Ignore, or Error.

    • Normal processing: When a DDL operation on the source database or table is detected, the operation is automatically synchronized to the destination.
    • Ignore: When a DDL operation on the source database or table is detected, the operation is ignored and not synchronized to the destination.
    • Error: When a DDL operation on the source database or table is detected, the migration job throws an exception.
      Figure 13 DDL configuration

  9. Configure task parameters.

    Table 10 Task parameters

    Parameter

    Description

    Default Value

    Execution Memory

    Memory allocated for job execution, which automatically changes with the number of CPU cores

    8GB

    CPU Cores

    Value range: 2 to 32

    For each CPU core added, 4 GB execution memory and one concurrency are automatically added.

    2

    Maximum Concurrent Requests

    Maximum number of jobs that can be concurrently executed. This parameter does not need to be configured and automatically changes with the number of CPU cores.

    1

    Custom attributes

    You can add custom attributes to modify some job parameters and enable some advanced functions. For details, see Job Performance Optimization.

    -

  10. Submit and run the job.

    After configuring the job, click Submit in the upper left corner to submit the job.

    Figure 14 Submitting the job

    After submitting the job, click Start on the job development page. In the displayed dialog box, set required parameters and click OK.

    Figure 15 Starting the job
    Table 11 Parameters for starting the job

    Parameter

    Description

    Offset Parameter

    • Incremental synchronization: Incremental data synchronization starts from a specified time point.
    • Full and incremental synchronization: All data is synchronized first, and then incremental data is synchronized in real time.

    Time

    This parameter must be set for incremental synchronization, and it specifies the start time of incremental synchronization.

    NOTE:

    If you set a time that is earlier than the earliest binlog time, the earliest log time is used.

  11. Monitor the job.

    On the job development page, click Monitor to go to the Job Monitoring page. You can view the status and log of the job, and configure alarm rules for the job. For details, see Real-Time Migration Job O&M.

    Figure 16 Monitoring the job

Performance Optimization

If the synchronization speed is too slow, rectify the fault by referring to Job Performance Optimization.