Updated on 2024-06-25 GMT+08:00

Creating a Composite Task

Overview

You can create a composite task if you need to continuously synchronize real-time data. This task allows FDI to implement real-time and incremental synchronization of multiple data tables from the source to destination, improving the data integration and synchronization efficiency.

The composite task supports flexible mappings of fields between data tables. For example, multiple fields in one data table at the source can be mapped to different data tables at the destination, or fields in multiple data tables at the source can be mapped to one data table at the destination.

Prerequisites

  • You have connected to data sources at the source and destination. For details, see Connecting to Data Sources.

    In the data source configuration at the source, the value of Database must be the same as the actual database name (case-sensitive). Otherwise, data synchronization will fail.

  • The CDC function has been enabled at the source. The CDC implementation modes vary depending on data source types. For details, see the following:
  • The retention period of CDC archiving logs in a data source at the source must be greater than the log time parsed by the integration task. Otherwise, the integration task cannot find archive logs, resulting in incremental synchronization failures. Therefore, it is not recommended that a data integration task be stopped for a long time. It is recommended that archive logs be retained for at least two days.
  • Do not perform Data Definition Language (DDL) operations on the source database during the first data synchronization.
  • If a large number of composite tasks are created, the database server and FDI plug-in process will consume resources. Therefore, you are advised not to create too many composite tasks for a database.
  • You can configure multiple database tables under multiple schemas in a single CDC task to implement unified collection for full or incremental data.
  • During the running of a composite task, you can add a table and perform full or incremental collection on the new table after the restart.
  • Synchronization is not supported for the following types of Oracle data sources at the source:
    • Fields of the large text type and binary type
    • A data table whose name contains lowercase letters cannot be synchronized.
    • Data tables that do not have primary keys cannot be synchronized.

      If a table contains only a small amount of data, collect full data once a day. Data in PostgreSQL tables can be cleared before you write to the table. If data is collected from the Oracle database but no primary key is available in the table, you can use the internal row ID of the Oracle database as the primary key. The row ID is 18 characters of digits and letters.

    • Data tables or data fields whose names are reserved in the database
    • Data deleted in truncate mode cannot be synchronized. Data deleted in entire table mode cannot be synchronized.
  • For the MySQL data source at the source:

    If the MySQL database uses the MGR cluster mode, the source data source must be directly connected to the active node instead of the route node.

    If the MySQL database contains a large amount of data, the connection to the database may time out when data is synchronized for the first time. You can modify the interactive_timeout and wait_timeout parameters of the MySQL database to avoid this problem.

Procedure

  1. Log in to the ROMA Connect console. On the Instances page, click View Console next to a specific instance.
  2. In the navigation pane on the left, choose Fast Data Integration > Task Management. On the page displayed, click Create Composite Task.
  3. On the Create Composite Task page, configure basic task information.
    Table 1 Basic task information

    Parameter

    Description

    Name

    After a task is created, the task name cannot be modified. It is recommended that you enter a name based on naming rules to facilitate search.

    Integration Mode

    Select Scheduled as the mode used for data integration.

    • Scheduled: A data integration task is executed according to the schedule to integrate data from the source to the destination.
      NOTE:

      This mode applies only to composite tasks whose data source type is MySQL, Oracle, PostgreSQL, SQL Server, or HANA.

    • Real-Time: The data integration task continuously detects updates to the data at the source and integrates updates to the destination in real time.

    The data integration mode varies depending on the data source. For details, see Table 1.

    Description

    It is recommended that you add task descriptions based on the actual task usage to differentiate tasks. The task description can be modified after being created.

    Tag

    Add or select an existing tag to classify tasks for quick search. New tags are saved when you save the task and can be searched directly when you create another task.

    Operation Types

    Mandatory for Integration Mode set to Real-Time.

    Select the operation types for database logs, including Insert, Delete, and Update. For example, if you select Insert and Update, only the logs related to data insert and update in the database are obtained.

    Use Quartz Cron Expression

    Mandatory for Integration Mode set to Scheduled.

    Schedule a task using a Quartz cron expression.

    Period

    Mandatory for Integration Mode set to Scheduled.

    Set the task execution interval, from minutes to months.

    For example, if Unit is set to Day and Period is set to 1, the task is executed once every day.

    Expression

    Mandatory for Use Quartz Cron Expression set to Yes.

    Configure a Quartz cron expression for task scheduling. Second in the expression is fixed to 0 because ROMA Connect supports only down-to-minute schedules. For details, see Appendix: Quartz Cron Expression Configuration.

    A task needs to be executed every 15 minutes from 01:00 a.m. to 04:00 a.m. every day. In this example, the expression should be as follows:

    0 0/15 1-4 * * ?

    Effective Time

    Mandatory for Integration Mode set to Scheduled.

    Start time of a task.

    Sync Existing Data

    Available only after you click Edit of a task.

    This function takes effect when you add a table mapping to a composite task that has started.

    • When enabled, the task synchronizes all the existing data of the newly added tables. After that, data is synchronized in increments.
    • When disabled, the task synchronizes only data generated from the start of the task.
  4. Configure a mapping between data sources at the source and destination.
    Table 2 Source and destination configuration information

    Parameter

    Description

    Source

    Instance Name

    Select the ROMA Connect instance that is being used.

    Integration Application

    Select the integration application to which the data source at the source belongs.

    Data Source Type

    Select a data source type at the source.

    Scheduled: MySQL, Oracle, SQL Server, PostgreSQL, HANA

    Real-time: MySQL, Oracle, and SQL Server

    Destination

    Instance Name

    Select the ROMA Connect instance that is being used. After the source instance is configured, the destination instance is automatically associated and does not need to be configured.

    Integration Application

    Select the integration application to which the data source at the destination belongs.

    Data Source Type

    Select a data source type at the destination.

    Scheduled: MySQL, Oracle, PostgreSQL, SQL Server, and HANA

    Real-time: MySQL, Oracle, PostgreSQL, Kafka, and SQL Server,

  5. Configure data table mappings between the source and destination in manual or automatic mode.
    • The length of a data field at the destination must be greater than or equal to that of the data field at the source. Otherwise, the synchronized data will be lost.
    • A maximum of 1000 data tables can be synchronized in a task.
    • If the data source type at the destination is Kafka, the table displayed on the destination is a virtual table. You only need to edit the field mappings in the table.
    • For the Oracle destination data source, if the source primary key field is empty, the record is discarded by default and no scheduling log error code is generated.
    • Automatic mapping
      1. Click Automatic Mapping. In the dialog box that is displayed, configure the mapping policy and range and then click Start Mapping. The mapping between data tables will be automatically generated.
      2. Click Edit to modify a mapping between data tables as required.
      3. Click View. In the dialog box displayed, modify the mappings between fields in the data tables as required or add new mappings.

        The length of a data field at the destination must be greater than or equal to that of the data field at the source. Otherwise, the synchronized data will be lost.

    • Manually adding a table mapping
      1. In the Table Mappings area, click Add to manually add a table mapping.
      2. Set Source Table Name and Destination Table Name for the table mapping.
      1. Click Map in the Operation column. In the window displayed, view or edit the mapping fields or delete unnecessary fields, or click Add to add a field mapping. Click in the upper right corner to set the configuration items for the field mapping:

        • Source Field: Select a field name in the source table, for example, ID.
        • Destination Field: Select the corresponding field name in the destination table, for example, Name.
        • Prefix: Enter the prefix of the synchronization field.
        • Suffix: Enter the suffix of the synchronization field.

        The following is an example of configuring the prefix and suffix. For example, if the field content is test, the prefix is tab1, and the suffix is 1, the field after synchronization is tab1test1.

        Figure 1 Configuring field mappings

        Function mapping is available only when PostgreSQL is set as the destination. Click Add Function Mapping to map functions.

        • Mapping Function: Select a mapping relationship.
        • Destination Field: Select a destination field to map, for example, Name.
      2. Click Save.
  6. Configure abnormal data storage.

    This configuration is available only when the data source type at the destination is MySQL, Oracle, PostgreSQL, or SQL Server. Before the configuration, connect to the OBS data source. For details, see Connecting to an OBS Data Source.

    During each task execution, if some data at the source meets integration conditions but cannot be integrated to the destination due to network jitter or other exceptions, ROMA Connect stores the data to the OBS bucket as text files.
    Table 3 Abnormal data storage information

    Parameter

    Description

    Source Data Type

    This parameter can only be set to OBS.

    Integration Application

    Select the required integration application.

    Name

    Select the OBS data source that you configured.

    Path

    Enter the object name of the OBS data source where abnormal data is to be stored. The value of Path cannot end with a slash (/).

  7. Set transaction thresholds.

    Transaction thresholds are available only when the integration mode is set to Real-time and the source data type set to Oracle.

    For Oracle sources, transactions whose size or duration exceeds the values set for the following two parameters are forcibly committed.
    Table 4 Setting transaction thresholds

    Parameter

    Description

    Transaction Size

    Default: 100,000

    Transaction Duration (min)

    Default: 250

  8. Click Create.

    In the following scenarios, click Reset in the Operation column of a composite task to reset its synchronization. Then the task starts to synchronize existing data again and later synchronize increments in real time.

    • Composite tasks need to support synchronization of new data tables and data fields at the source.
    • The CDC archive logs at the source are cleared. As a result, the composite task fails to be synchronized.
    • The MySQL database does not use the GTID mode, and an active/standby switchover occurs. As a result, the composite task fails to be synchronized.

    You can reset the task only when Task Status is Stopped.

    Run the sql: select pg_drop_replication_slot('roma_fdi_{task_id}') command in the database to delete the replication slot for GaussDB and PostgreSQL data sources after a composite task is deleted. Replace {task_id} with the actual ID.