Creating a Composite Task
Overview
You can create a composite task if you need to continuously synchronize real-time data. This task allows FDI to implement real-time and incremental synchronization of multiple data tables from the source to destination, improving the data integration and synchronization efficiency.
The composite task supports flexible mappings of fields between data tables. For example, multiple fields in one data table at the source can be mapped to different data tables at the destination, or fields in multiple data tables at the source can be mapped to one data table at the destination.
Prerequisites
- You have connected to data sources at the source and destination. For details, see Connecting to Data Sources.
In the data source configuration at the source, the value of Database must be the same as the actual database name (case-sensitive). Otherwise, data synchronization will fail.
- The CDC function has been enabled at the source. The CDC implementation modes vary depending on data source types. For details, see the following:
- The retention period of CDC archiving logs in a data source at the source must be greater than the log time parsed by the integration task. Otherwise, the integration task cannot find archive logs, resulting in incremental synchronization failures. Therefore, it is not recommended that a data integration task be stopped for a long time. It is recommended that archive logs be retained for at least two days.
- Do not perform Data Definition Language (DDL) operations on the source database during the first data synchronization.
- If a large number of composite tasks are created, the database server and FDI plug-in process will consume resources. Therefore, you are advised not to create too many composite tasks for a database.
- You can configure multiple database tables under multiple schemas in a single CDC task to implement unified collection for full or incremental data.
- During the running of a composite task, you can add a table and perform full or incremental collection on the new table after the restart.
- Synchronization is not supported for the following types of Oracle data sources at the source:
- Fields of the large text type and binary type
- A data table whose name contains lowercase letters cannot be synchronized.
- Data tables that do not have primary keys cannot be synchronized.
If a table contains only a small amount of data, collect full data once a day. Data in PostgreSQL tables can be cleared before you write to the table. If data is collected from the Oracle database but no primary key is available in the table, you can use the internal row ID of the Oracle database as the primary key. The row ID is 18 characters of digits and letters.
- Data tables or data fields whose names are reserved in the database
- Data deleted in truncate mode cannot be synchronized. Data deleted in entire table mode cannot be synchronized.
- For the MySQL data source at the source:
If the MySQL database uses the MGR cluster mode, the source data source must be directly connected to the active node instead of the route node.
If the MySQL database contains a large amount of data, the connection to the database may time out when data is synchronized for the first time. You can modify the interactive_timeout and wait_timeout parameters of the MySQL database to avoid this problem.
Procedure
- Log in to the ROMA Connect console. On the Instances page, click View Console next to a specific instance.
- In the navigation pane on the left, choose Fast Data Integration > Task Management. On the page displayed, click Create Composite Task.
- On the Create Composite Task page, configure basic task information.
Table 1 Basic task information Parameter
Description
Name
After a task is created, the task name cannot be modified. It is recommended that you enter a name based on naming rules to facilitate search.
Integration Mode
Select Scheduled as the mode used for data integration.
- Scheduled: A data integration task is executed according to the schedule to integrate data from the source to the destination.
NOTE:
This mode applies only to composite tasks whose data source type is MySQL, Oracle, PostgreSQL, SQL Server, or HANA.
- Real-Time: The data integration task continuously detects updates to the data at the source and integrates updates to the destination in real time.
The data integration mode varies depending on the data source. For details, see Table 1.
Description
It is recommended that you add task descriptions based on the actual task usage to differentiate tasks. The task description can be modified after being created.
Tag
Add or select an existing tag to classify tasks for quick search. New tags are saved when you save the task and can be searched directly when you create another task.
Operation Types
Mandatory for Integration Mode set to Real-Time.
Select the operation types for database logs, including Insert, Delete, and Update. For example, if you select Insert and Update, only the logs related to data insert and update in the database are obtained.
Use Quartz Cron Expression
Mandatory for Integration Mode set to Scheduled.
Schedule a task using a Quartz cron expression.
Period
Mandatory for Integration Mode set to Scheduled.
Set the task execution interval, from minutes to months.
For example, if Unit is set to Day and Period is set to 1, the task is executed once every day.
Expression
Mandatory for Use Quartz Cron Expression set to Yes.
Configure a Quartz cron expression for task scheduling. Second in the expression is fixed to 0 because ROMA Connect supports only down-to-minute schedules. For details, see Appendix: Quartz Cron Expression Configuration.
A task needs to be executed every 15 minutes from 01:00 a.m. to 04:00 a.m. every day. In this example, the expression should be as follows:
0 0/15 1-4 * * ?
Effective Time
Mandatory for Integration Mode set to Scheduled.
Start time of a task.
Sync Existing Data
Available only after you click Edit of a task.
This function takes effect when you add a table mapping to a composite task that has started.
- When enabled, the task synchronizes all the existing data of the newly added tables. After that, data is synchronized in increments.
- When disabled, the task synchronizes only data generated from the start of the task.
- Scheduled: A data integration task is executed according to the schedule to integrate data from the source to the destination.
- Configure a mapping between data sources at the source and destination.
Table 2 Source and destination configuration information Parameter
Description
Source
Instance Name
Select the ROMA Connect instance that is being used.
Integration Application
Select the integration application to which the data source at the source belongs.
Data Source Type
Select a data source type at the source.
Scheduled: MySQL, Oracle, SQL Server, PostgreSQL, HANA
Real-time: MySQL, Oracle, and SQL Server
Destination
Instance Name
Select the ROMA Connect instance that is being used. After the source instance is configured, the destination instance is automatically associated and does not need to be configured.
Integration Application
Select the integration application to which the data source at the destination belongs.
Data Source Type
Select a data source type at the destination.
Scheduled: MySQL, Oracle, PostgreSQL, SQL Server, and HANA
Real-time: MySQL, Oracle, PostgreSQL, Kafka, and SQL Server,
- Configure data table mappings between the source and destination in manual or automatic mode.
- The length of a data field at the destination must be greater than or equal to that of the data field at the source. Otherwise, the synchronized data will be lost.
- A maximum of 1000 data tables can be synchronized in a task.
- If the data source type at the destination is Kafka, the table displayed on the destination is a virtual table. You only need to edit the field mappings in the table.
- For the Oracle destination data source, if the source primary key field is empty, the record is discarded by default and no scheduling log error code is generated.
- Automatic mapping
- Click Automatic Mapping. In the dialog box that is displayed, configure the mapping policy and range and then click Start Mapping. The mapping between data tables will be automatically generated.
- Click Edit to modify a mapping between data tables as required.
- Click View. In the dialog box displayed, modify the mappings between fields in the data tables as required or add new mappings.
The length of a data field at the destination must be greater than or equal to that of the data field at the source. Otherwise, the synchronized data will be lost.
- Manually adding a table mapping
- In the Table Mappings area, click Add to manually add a table mapping.
- Set Source Table Name and Destination Table Name for the table mapping.
- Click Map in the Operation column. In the window displayed, view or edit the mapping fields or delete unnecessary fields, or click Add to add a field mapping. Click in the upper right corner to set the configuration items for the field mapping:
- Source Field: Select a field name in the source table, for example, ID.
- Destination Field: Select the corresponding field name in the destination table, for example, Name.
- Prefix: Enter the prefix of the synchronization field.
- Suffix: Enter the suffix of the synchronization field.
The following is an example of configuring the prefix and suffix. For example, if the field content is test, the prefix is tab1, and the suffix is 1, the field after synchronization is tab1test1.
Figure 1 Configuring field mappings
Function mapping is available only when PostgreSQL is set as the destination. Click Add Function Mapping to map functions.
- Mapping Function: Select a mapping relationship.
- Destination Field: Select a destination field to map, for example, Name.
- Click Save.
- Configure abnormal data storage.
This configuration is available only when the data source type at the destination is MySQL, Oracle, PostgreSQL, or SQL Server. Before the configuration, connect to the OBS data source. For details, see Connecting to an OBS Data Source.
During each task execution, if some data at the source meets integration conditions but cannot be integrated to the destination due to network jitter or other exceptions, ROMA Connect stores the data to the OBS bucket as text files.Table 3 Abnormal data storage information Parameter
Description
Source Data Type
This parameter can only be set to OBS.
Integration Application
Select the required integration application.
Name
Select the OBS data source that you configured.
Path
Enter the object name of the OBS data source where abnormal data is to be stored. The value of Path cannot end with a slash (/).
- Set transaction thresholds.
Transaction thresholds are available only when the integration mode is set to Real-time and the source data type set to Oracle.
For Oracle sources, transactions whose size or duration exceeds the values set for the following two parameters are forcibly committed.Table 4 Setting transaction thresholds Parameter
Description
Transaction Size
Default: 100,000
Transaction Duration (min)
Default: 250
- Click Create.
In the following scenarios, click Reset in the Operation column of a composite task to reset its synchronization.
A full reset will delete all synchronization progress information but will not remove any destination data. When the task is executed again, full synchronization will restart. If the database contains a large amount of data, initiating real-time synchronization may take a long time. Please exercise caution when using this function.
Then the task starts to synchronize existing data again and later synchronize increments in real time.
- Composite tasks need to support synchronization of new data tables and data fields at the source.
- The CDC archive logs at the source are cleared. As a result, the composite task fails to be synchronized.
- The MySQL database does not use the GTID mode, and an active/standby switchover occurs. As a result, the composite task fails to be synchronized.
You can reset the task only when Task Status is Stopped.
Run the sql: select pg_drop_replication_slot('roma_fdi_{task_id}') command in the database to delete the replication slot for GaussDB and PostgreSQL data sources after a composite task is deleted. Replace {task_id} with the actual ID.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot