Configuring a Job for Migrating Data from a Centralized/Distributed GaussDB to MRS Hudi
This section describes how to configure a job for migrating data from a centralized/distributed GaussDB to GaussDB(DWS).
Supported Source and Destination Database Versions
Source Database |
Destination Database |
---|---|
Centralized/Distributed GaussDB (kernel engine versions: 505.1.0, 505.1.0.SPC0100, and 505.2.0) |
MRS cluster (3.2.0-LTS.x and 3.5.x) Hudi (0.11.0) |
Database Account Permissions
Before you use DataArts Migration for data synchronization, ensure that the source and destination database accounts meet the requirements in the following table. The required account permissions vary depending on the synchronization task type.
Type |
Required Permissions |
---|---|
Source database connection account |
|
Destination database connection account |
The account must have the following permissions for each table in the destination database: INSERT, SELECT, UPDATE, DELETE, CONNECT, and CREATE. |

- You are advised to create independent database accounts for DataArts Migration task connections to prevent task failures caused by password modification.
- After changing the account passwords for the source or destination databases, modify the connection information in Management Center as soon as possible to prevent automatic retries after a task failure. Automatic retries will lock the database accounts.
Supported Synchronization Objects
The following table lists the objects that can be synchronized using different links in DataArts Migration.
Type |
Note |
---|---|
Synchronization objects |
|
Important Notes
In addition to the constraints on supported data sources and versions, connection account permissions, and synchronization objects, you also need to pay attention to the notes in the following table.
Type |
Restriction |
---|---|
Database |
|
Usage |
General:
Full synchronization phase: During task startup and full data synchronization, do not perform DDL operations on the source database. Otherwise, the task may fail. Incremental synchronization phase:
Troubleshooting: If any problem occurs during task creation, startup, full synchronization, incremental synchronization, or completion, rectify the fault by referring to FAQs. |
Other |
|
Procedure
This section uses real-time synchronization from GaussDB to Hudi as an example to describe how to configure a real-time data migration job. Before that, ensure that you have read the instructions described in Check Before Use and completed all the preparations.
- Create a real-time migration job by following the instructions in Creating a Real-Time Migration Job and go to the job configuration page.
- Select the data connection type. Select PostgreSQL for Source and Hudi for Destination.
Figure 1 Selecting the data connection type
- Select a job type. The default migration type is Real-time. The migration scenario is Entire DB.
Figure 2 Setting the migration job type
- Configure network resources. Select the created GaussDB(RDS PostgreSQL) and Hudi data connections and the migration resource group for which the network connection has been configured.
Figure 3 Selecting data connections and a migration resource group
If no data connection is available, click Create to go to the Manage Data Connections page of the Management Center console and click Create Data Connection to create a connection. For details, see Configuring DataArts Studio Data Connection Parameters.
If no migration resource group is available, click Create to create one. For details, see Buying a DataArts Migration Resource Group Incremental Package.
- Check the network connectivity. After the data connections and migration resource group are configured, perform the following operations to check the connectivity between the data sources and the migration resource group.
- Click Source Configuration. The system will test the connectivity of the entire migration job.
- Click Test in the source and destination and migration resource group.
If the network connectivity is abnormal, see How Do I Troubleshoot a Network Disconnection Between the Data Source and Resource Group?
- Configure source parameters.
Select the databases and tables to be synchronized based on the following table.
Table 5 Selecting the databases and tables to be synchronized Synchronization Scenario
Configuration Method
Entire DB
Select the GaussDB databases and tables to be migrated.Figure 4 Selecting databases and tablesBoth databases and tables can be customized. You can select one database and one table, or multiple databases and tables.
- Configure destination parameters.
- Set Database and Table Matching Policy.
For details about the matching policy between source and destination databases and tables in each synchronization scenario, see the following table.
Table 6 Database and table matching policy Synchronization Scenario
Configuration Method
Entire DB
- Database Matching Policy
- Same name as the source database: Data will be synchronized to the Hudi database with the same name as the source GaussDB database.
- Custom: Data will be synchronized to the Hudi database you specify.
- Table Matching Policy
- Same name as the source table: Data will be synchronized to the Hudi table with the same name as the source GaussDB table.
- Custom: Data will be synchronized to the Hudi table you specify.
Figure 5 Database and table matching policy in the entire database migration scenarioNOTE:
When you customize a matching policy, you can use built-in variables #{source_db_name} and #{source_table_name} to identify the source database name and table name. The table matching policy must contain #{source_table_name}.
- Database Matching Policy
- Set Hudi parameters.
For details, see Table 7.
Figure 6 Hudi destination parametersTable 7 Hudi destination parameters Parameter
Description
Data Storage Path
Warehouse path when tables are automatically created in Hudi. A subdirectory is created in the warehouse path for each table. You can enter an HDFS or OBS path. The path format is as follows:
- OBS path: obs://bucket/warehouse
- HDFS path: /tmp/warehouse
Global Configuration of Hudi Table Attributes
Some advanced functions can be configured using parameters. For details, see Hudi advanced parameters.
Table 8 Hudi advanced parameters Parameter
Type
Default Value
Unit
Description
index.type
string
BLOOM
N/A
Index type of the Hudi table
BLOOM and BUCKET indexes are supported. If a large amount of data need to be migrated, BUCKET indexes are recommended for better performance.
hoodie.bucket.index.num.buckets
int
256
Count
Number of buckets within a Hudi table partition
NOTE:When using Hudi BUCKET tables, you need to set the number of buckets for a table partition. The number of buckets affects the table performance.
- Number of buckets for a non-partitioned table = MAX(Data volume of the table (GB)/2 GB x 2, rounded up, 4)
- Number of buckets for a partitioned table = MAX(Data volume of a partition (GB)/2 GB x 2, rounded up, 1)
Pay attention to the following:
- The total data volume of a table, instead of the compressed size, is used.
- An even number of buckets is recommended. The minimum number of buckets should be 4 for a non-partitioned table and 1 for a partitioned table.
changelog.enabled
boolean
false
N/A
Whether to enable the Hudi ChangeLog function. If this function is enabled, the migration job can output DELETE and UPDATE BEFORE data.
logical.delete.enabled
boolean
true
N/A
Whether to enable logical deletion. If the ChangeLog function is enabled, logical deletion must be disabled.
hoodie.write.liststatus.optimized
boolean
true
N/A
Whether to enable liststatus optimization when log files are written. If the migration job involves large tables or a large amount of partitioned data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.
hoodie.index.liststatus.optimized
boolean
false
N/A
Whether to enable liststatus optimization during data locating. If the migration job involves large tables or a large amount of partitioned data, the list operation is time-consuming during startup, which may cause job startup timeout. You are advised to disable this function.
compaction.async.enabled
boolean
true
N/A
Whether to enable asynchronous compaction. The compaction operation affects the writing performance of real-time jobs. If you use an external compaction operation, you can set this parameter to false to disable compaction for real-time processing migration jobs.
compaction.schedule.enabled
boolean
true
N/A
Whether to generate compaction plans. Compaction plans must be generated by this service and can be executed by Spark.
compaction.delta_commits
int
5
Count
Compaction request generation frequency Lowering the compaction request generation frequency reduces the compaction frequency and improves job performance. If there is a small volume of incremental data to be synchronized to Hudi, you can set a larger value for this parameter.
NOTE:For example, if this parameter is set to 40, a compaction request is generated for every 40 commits. Since DataArts Migration generates a commit every minute, the interval between compaction requests is 40 minutes.
clean.async.enabled
boolean
true
N/A
Whether to clear data files of historical versions
clean.retain_commits
int
30
Count
Number of recent commits to retain. Data files related to these commits will be retained for a period calculated by multiplying the number of specified commits by the interval between commits. You are advised to set this parameter to twice the value of compaction.delta_commits.
NOTE:For example, if this parameter is set to 80 and since DataArts Migration generates a commit every minute, data files related to commits generated 80 minutes earlier are cleaned, and data files related to the recent 80 commits are retained.
hoodie.archive.automatic
boolean
true
N/A
Whether to age Hudi commit files
archive.min_commits
int
40
Count
Number of recent commits to keep when historical commits are archived to log files
You are advised to set this parameter to one greater than clean.retain_commits.
NOTE:For example, if this parameter is set to 81, the files related to the recent 81 commits are retained when an archive operation is triggered.
archive.max_commits
int
50
Count
Number of commits that triggers an archive operation
You are advised to set this parameter to 20 greater than archive.min_commits.
NOTE:For example, if the parameter is set to 101, an archive operation is triggered when the files of 101 commits are generated.
- To achieve optimal performance for the migration job, you are advised to use an MOR table that uses Hudi BUCKET indexes and configure the number of buckets based on the actual data volume.
- To ensure the stability of the migration job, you are advised to split the Hudi Compaction job into Spark jobs and execute them by MRS, and enable compaction plans to be generated for this migration job. For details, see How Do I Configure a Spark Periodic Task for Hudi Compaction?
- Set Database and Table Matching Policy.
- Refresh and check the mapping between the source and destination tables. In addition, you can modify table attributes, add additional fields, and use the automatic table creation capability to create tables in the destination Hudi database.
Figure 7 Mapping between source and destination tables
- Synchronization Primary Key
The primary key must be set for Hudi tables. If the source table has no primary key, you must manually select the primary key during field mapping.
- Edit Table Attribute
Click Edit Table Attributes in the Operation column to configure Hudi table attributes, including the table type, partition type, and custom attributes.
Figure 8 Configuring the Hudi table attributes- Table Type: Hudi table type. Select MERGE_ON_READ or COPY_ON_WRITE.
- Partition Type: partition type of the Hudi table. Select No partition, Time, or Custom.
- For Time, you need to specify a source field name and select a time conversion format.
For example, you can specify the source field name src_col_1 and select a time conversion format, for example, day(yyyyMMdd), month(yyyyMM), or year(yyyy). During automatic table creation, a cdc_partition_key field is created in the Hudi table by default. The system formats the value of the source field (src_col_1) based on the configured time conversion format and writes the value to cdc_partition_key.
- Custom partitions do not support timestamp fields. If you use timestamp fields, the job will fail.
- For Time, you need to specify a source field name and select a time conversion format.
- Customize table attributes. Some advanced functions of a single table can be configured using parameters. For details about the parameters, see the table that lists Hudi advanced configurations.
- Edit additional fields: Click Additional Field in the Operation column to add custom fields to the destination Hudi table. For a new table, you can add additional fields to the existing fields in the source table. You can customize the field name, select the field type, and enter the field value.
- Field Name: name of the new field in the destination Hudi table
- Field Type: Type of the new field in the destination Hudi table
- Field Value: Value source of the new field in the destination Hudi table
Table 9 Additional field value obtaining mode Type
Example
Constant
Digits, letters, and special characters are supported. Color emoticons may cause a job submission failure.
Built-in variable
- Source host IP address: source.host
- Source schema name: source.schema
- Source table name: source.table
- Destination schema name: target.schema
- Destination table name: target.table
Source table field
Any field in the source table
Do not change the name of the source table field when the job is running. Otherwise, the job may be abnormal.
UDF
- substring(#col, pos[, len]): obtains a substring of a specified length from the source column name. The substring range is [pos, pos+len).
- date_format(#col, time_format[, src_tz, dst_tz]): formats the source column name based on a specified time format. The time zone can be converted using src_tz and dst_tz.
- now([tz]): obtains the current time in a specified time zone.
- if(cond_exp, str1, str2): returns str1 if the condition expression cond_exp is met and returns str2 otherwise.
- concat(#col[, #str, ...]): concatenates multiple parameters, including source columns and strings.
- from_unixtime(#col[, time_format]): formats a Unix timestamp based on a specified time format.
- unix_timestamp(#col[, precision, time_format]): converts a time into a Unix timestamp of a specified time format and precision. time_format must be the same as that in the source data.
- Automatic table creation: Click Auto Table Creation to automatically create tables in the destination database based on the configured mapping policy. After the tables are created, Existing table is displayed for them.
Figure 9 Automatic table creation
- DataArts Migration supports only automatic table creation. You need to manually create databases and schemas at the destination before using this function.
- For details about the field type mapping for automatic table creation, see Field Type Mapping.
- An automatically created Hudi table contains three audit fields: cdc_last_update_date, logical_is_deleted, and _hoodie_event_time. The _hoodie_event_time field is used as the pre-aggregation key of the Hudi table.
- Synchronization Primary Key
- Configure task parameters.
Table 10 Task parameters Parameter
Description
Default Value
Execution Memory
Memory allocated for job execution, which automatically changes with the number of CPU cores
8GB
CPU Cores
Value range: 2 to 32
For each CPU core added, 4 GB execution memory and one concurrency are automatically added.
2
Maximum Concurrent Requests
Maximum number of jobs that can be concurrently executed. This parameter does not need to be configured and automatically changes with the number of CPU cores.
1
Auto Retry
Whether to enable automatic retry upon a job failure
No
Maximum Retries
This parameter is displayed when Auto Retry is set to Yes.
1
Retry Interval (Seconds)
This parameter is displayed when Auto Retry is set to Yes.
120s
Write Dirty Data
Whether to record dirty data. By default, dirty data is not recorded. If there is a large amount of dirty data, the synchronization speed of the task is affected.
- No: Dirty data is not recorded. This is the default value.
Dirty data is not allowed. If dirty data is generated during the synchronization, the task fails and exits.
- Yes: Dirty data is allowed, that is, dirty data does not affect task execution.
When dirty data is allowed and its threshold is set:
- If the generated dirty data is within the threshold, the synchronization task ignores the dirty data (that is, the dirty data is not written to the destination) and is executed normally.
- If the generated dirty data exceeds the threshold, the synchronization task fails and exits.
NOTE:
Criteria for determining dirty data: Dirty data is meaningless to services, is in an invalid format, or is generated when the synchronization task encounters an error. If an exception occurs when a piece of data is written to the destination, this piece of data is dirty data. Therefore, data that fails to be written is classified as dirty data.
For example, if data of the VARCHAR type at the source is written to a destination column of the INT type, dirty data cannot be written to the migration destination due to improper conversion. When configuring a synchronization task, you can configure whether to write dirty data during the synchronization and configure the number of dirty data records (maximum number of error records allowed in a single partition) to ensure task running. That is, when the number of dirty data records exceeds the threshold, the task fails and exits.
No
Dirty Data Policy
This parameter is displayed when Write Dirty Data is set to Yes. The following policies are supported:
- Do not archive: Dirty data is only recorded in job logs, but not stored.
- Archive to OBS: Dirty data is stored in OBS and printed in job logs.
Do not archive
Write Dirty Data Link
This parameter is displayed when Dirty Data Policy is set to Archive to OBS.
Only links to OBS support dirty data writes.
-
Dirty Data Directory
OBS directory to which dirty data will be written
-
Dirty Data Threshold
This parameter is only displayed when Write Dirty Data is set to Yes.
You can set the dirty data threshold as required.
NOTE:- The dirty data threshold takes effect for each concurrency. For example, if the threshold is 100 and the concurrency is 3, the maximum number of dirty data records allowed by the job is 300.
- Value -1 indicates that the number of dirty data records is not limited.
100
Custom attributes
You can add custom attributes to modify some job parameters and enable some advanced functions. For details, see Job Performance Optimization.
-
- No: Dirty data is not recorded. This is the default value.
- Submit and run the job.
After configuring the job, click Submit in the upper left corner to submit the job.
Figure 10 Submitting the jobAfter submitting the job, click Start on the job development page. In the displayed dialog box, set required parameters and click OK.
Figure 11 Starting the jobTable 11 Parameters for starting the job Parameter
Description
Synchronous Mode
- Incremental Synchronization: Incremental data synchronization starts from a specified time point.
- Full and incremental synchronization: All data is synchronized first, and then incremental data is synchronized in real time.
- Monitor the job.
On the job development page, click Monitor to go to the Job Monitoring page. You can view the status and log of the job, and configure alarm rules for the job. For details, see Real-Time Migration Job O&M.
Figure 12 Monitoring the job
Performance Optimization
If the synchronization speed is too slow, rectify the fault by referring to Job Performance Optimization.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot