Updated on 2024-04-11 GMT+08:00

From Oracle to Kafka

Supported Source and Destination Databases

Table 1 Supported databases

Source DB

Destination DB

  • On-premises databases (Oracle 10g, 11g, 12c, 18c, 19c, and 21c)
  • Self-built databases on ECS (Oracle 10g, 11g, 12c, 18c, 19c, and 21c)
  • Kafka

Prerequisites

  • You have logged in to the DRS console.
  • For details about the DB types and versions supported by real-time synchronization, see Real-Time Synchronization.

Suggestions

  • The success of database synchronization depends on environment and manual operations. To ensure a smooth synchronization, perform a synchronization trial before you start the synchronization to help you detect and resolve problems in advance.
  • It is recommended that you start a task during off-peak hours to minimize the impact of synchronization on your services.

Precautions

Before creating a synchronization task, read the following notes:

  • You are advised to create an independent database account for DRS task connection to prevent task failures caused by database account password modification.
  • After changing the account passwords for the source or destination databases, modify the connection information in the DRS task as soon as possible to prevent automatic retry after a task failure. Automatic retry will lock the database accounts.
Table 2 Environment Constraints

Type

Constraint

Database permissions

  • Source database:
    • Oracle 12c or later in tenant mode:

      To synchronize a container database (CDB) of Oracle 12c or later, you must have the following permissions: CREATE SESSION, SELECT ANY DICTIONARY, SELECT for a single table (GRANT SELECT ON <userName.tbName> to drsUser), EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, and LOGMINING.

    • To synchronize a pluggable database (PDB) of Oracle 12c or later, you must have the following permissions: CREATE SESSION, SELECT ANY DICTIONARY, SELECT for a single table (GRANT SELECT ON <userName.tbName> to drsUser), EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, LOGMINING, and CREATE SESSION, SELECT ANY DICTIONARY, EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, LOGMINING and SET CONTAINER (GRANT SET CONTAINER TO <userName> CONTAINER=ALL) permissions for a CDB.
    • Oracle 12c or later in non-tenant mode:

      You must have the following permissions: CREATE SESSION, SELECT ANY DICTIONARY, SELECT for a single table (GRANT SELECT ON <userName.tbName> to drsUser), EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, and LOGMINING.

    • To synchronize a database of Oracle 11g or earlier, you must have the following permissions: CREATE SESSION, SELECT ANY DICTIONARY, SELECT for a single table (GRANT SELECT ON <userName.tbName> to drsUser), EXECUTE_CATALOG_ROLE, and SELECT ANY TRANSACTION.
    • During incremental synchronization, enable PK, UI, or ALL supplemental logging for the source Oracle database at the database level or table level. If supplemental logging is enabled at table level, enable supplemental logging again after you rebuild or rename tables. During the synchronization, ensure that the preceding settings are always enabled.
    • Oracle 12c or later does not support incremental synchronization using accounts whose ORACLE_MAINTAINED is Y (except system/sys), because accounts with this attribute do not have the permission to parse logs.

Synchronization object

  • Only table data can be synchronized in real time.
  • The following data types are supported: VARCHAR, VARCHAR2, NVARCHAR2, NUMBER, FLOAT, LONG, DATE, BINARY_FLOAT, BINARY_DOUBLE, CHAR, NCHAR, ROWID, TIMESTAMP, TIMESTAMP WITH TIME ZONE, and TIMESTAMP WITH LOCAL TIME ZONE.
  • The following column types cannot pass the precheck: GEOMETRY and self-defined.
  • The following column types cannot be synchronized but can pass the precheck: NTERVAL_YEAR_TO_MONTH, INTERVAL_DAY_TO_SECOND, UROWID, BFILE and XML.
  • The following column types are deleted by default before synchronization: RAW, BLOB, CLOB, NCLOB, LONG and LONG RAW.
  • For incremental synchronization, the LOB type supports only the BasicFiles attribute and does not support the SecureFiles attribute. The size of the LOB type must be less than 10 MB.
  • During the incremental synchronization, if the source database is a physical standby Oracle database, data of the LOB type cannot be parsed (the data dictionary cannot be generated). If the table to be synchronized contains data of the LOB type, the incremental synchronization will fail.
  • In the incremental phase, Oracle extended characters are not supported. The standard character set cannot parse Oracle customized extended characters.
  • Temporary tables in the source database cannot be synchronized.
  • Tables whose default values contain expressions of functions cannot be synchronized.
  • Tables with virtual columns in the source database cannot be synchronized.
  • If the empty function of the LOB type is used to write data in the Oracle database, the value queried through JDBC is an empty string. Whether the value is an empty string or NULL after being written to the destination database depends on the processing of the empty string in the destination database.

Source database

  • The names of databases and tables cannot contain non-ASCII characters or special characters .><\`|,?'!"
  • An empty source database cannot be synchronized.
  • If the source database is an RAC database, you cannot add or delete nodes.
  • If the source database is an RAC database and uses SCAN IP, the synchronization instance must be able to connect to the virtual IP addresses of all RAC nodes. Otherwise, the connection check fails.
  • Only the following character sets are supported: ZHS16GBK, AL32UTF8, UTF8, US7ASCII, WE8MSWIN1252, WE8ISO8859P1, WE8ISO8859P2, WE8ISO8859P4, WE8ISO8859P5, WE8ISO8859P7, WE8ISO8859P9, WE8ISO8859P13, WE8ISO8859P15.

Destination database

  • The destination database is a Kafka database.

Precautions

  • If there are special characters in the Oracle database, the code of the destination Oracle database must be the same as the code of the source Oracle database. Otherwise, garbled characters are displayed in the destination database.
  • After data in the Oracle database is synchronized to Kafka, the character set becomes UTF8.
  • The size of an Oracle archive log file must be greater than the maximum size of a single data record to prevent incremental data parsing exceptions caused by cross-file (more than two log files) of a single data record.
  • For an Oracle RAC cluster, use the scan IP address and service name to create a task. The SCAN IP address can provide better fault tolerance, load capability, and synchronization experience.
  • If the source is an Oracle RAC database and the SCAN IP address is used to configure a DRS task, ensure that the SCAN IP address and DRS node IP address can communicate with all virtual IP addresses of the source database. Otherwise, the connectivity check fails. If the SCAN IP address is not used, the virtual IP address of a node can be used. In this case, DRS logs are parsed only on the RAC node specified by the virtual IP address.
  • If the source is an RAC database, all RAC nodes must be online when incremental synchronization is started for the first time. Otherwise, an error occurs during incremental synchronization.
  • If the source is an RAC database, the number of nodes cannot be increased or decreased during incremental synchronization to avoid incremental synchronization exceptions and ensure strong data consistency.
  • If the PDB database is used for synchronization, all PDBs must be enabled during incremental synchronization due to the restrictions of the Oracle LogMiner component.
  • In Oracle 12.2 and later versions, due to the restrictions of the Oracle LogMiner component, a table or column name contains no more than 30 characters during an incremental synchronization.
  • The supplemental log supports all or primary key+unique index columns.
  • If a column that is not displayed in the log, it will not be displayed in the transferred message, which means that the column is not updated.
  • During synchronization, do not delete the username, password, and permissions of the source and destination databases or modify the port of the destination database.
  • During the synchronization, do not perform the resetlogs operation on the source Oracle database. Otherwise, data cannot be synchronized and tasks cannot be restored.
  • During synchronization, the rollback operation of the LOB type is not supported. Otherwise, the synchronization task fails.
  • During the synchronization, the username (schema name) of the source Oracle database cannot be changed, including the scenarios where the schema name is changed by modifying the USER$ dictionary table in versions earlier than 11.2.0.2 and by using ALTER USER username RENAME TO new_username in versions later than 11.2.0.2.
  • During incremental synchronization, you are not advised to select a hybrid partition table because DML logs are not generated when data in the external partition of the hybrid partition table changes. DRS cannot obtain the changes during incremental synchronization, which may cause data inconsistency.
  • In an incremental synchronization, the PDB database cannot be directly connected. You need to provide the service name/SID of the CDB.
  • During an incremental synchronization of table-level objects, renaming tables is not recommended.
  • If you select Tables for Synchronization Object, all tables must be synchronized to the same topic at the destination end.
  • DDL operations can be performed on tables.
  • When editing the task to add a new table, ensure that transactions of the new table have been committed. Otherwise, transactions that are not committed may fail to be synchronized to the destination database. You are advised to add tables during off-peak hours.

Procedure

  1. On the Data Synchronization Management page, click Create Synchronization Task.
  2. On the Create Synchronization Instance page, specify the task name, description, and the synchronization instance details, and click Create Now.

    • Task information description
      Table 3 Task and recipient description

      Parameter

      Description

      Task Name

      The task name must start with a letter and consist of 4 to 50 characters. It can contain only letters, digits, hyphens (-), and underscores (_).

      Description

      The description consists of a maximum of 256 characters and cannot contain special characters !=<>'&"\

    • Synchronization instance details
      Table 4 Synchronization instance settings

      Parameter

      Description

      Data Flow

      Choose Self-built to self-built.

      Source DB Engine

      Select Oracle.

      Destination DB Engine

      Select Kafka.

      Network Type

      The Public network is used as an example. Available options: VPC, Public network and VPN or Direct Connect

      VPC

      Select an available VPC.

      Synchronization Instance Subnet

      Select the subnet where the synchronization instance is located. You can also click View Subnet to go to the network console to view the subnet where the instance resides.

      By default, the DRS instance and the destination DB instance are in the same subnet. You need to select the subnet where the DRS instance resides and ensure that there are available IP addresses. To ensure that the synchronization instance is successfully created, only subnets with DHCP enabled are displayed.

      Security Group

      Select a security group. You can use security group rules to allow or deny access to the instance.

      Synchronization Mode

      • Incremental

        Through log parsing, incremental data generated on the source database is synchronized to the destination database.

        During synchronization, the source database continues to provide services for external systems with zero downtime.

    • Task Type
      Table 5 Task type information

      Parameter

      Description

      AZ

      Select the AZ where you want to create the DRS task. Selecting the one housing the source or destination database can provide better performance.

      If DRS Task Type is set to Dual-AZ, you can specify Primary AZ and Standby AZ.

    If a task fails to be created, DRS retains the task for three days by default. After three days, the task automatically ends.

  3. On the Configure Source and Destination Databases page, wait until the synchronization instance is created. Then, specify source and destination database information and click Test Connection for both the source and destination databases to check whether they have been connected to the synchronization instance. After the connection tests are successful, select the check box before the agreement and click Next.

    Table 6 Source database settings

    Parameter

    Description

    IP Address or Domain Name

    The IP address or domain name of the source database.

    NOTE:

    For a RAC cluster, use a scan IP address to improve access performance.

    Port

    The port of the source database. Range: 1 – 65535

    Database Service Name

    Enter a database service name (Service Name/SID). The client can connect to the Oracle database through the database service name. For details about how to query the database service name, see the prompt on the GUI.

    PDB Name

    Container database (CDB) and pluggable database (PDB) are new features in Oracle 12c and later versions. This function is optional, but it must be enabled if you want to migrate only PDB tables.

    Enter the service name, SID, username, and password of the CDB that contains the PDB tables to be migrated.

    Database Username

    The username for accessing the source database.

    Database Password

    The password for the database username.

    SSL Connection

    SSL encrypts the connections between the source and destination databases. If SSL is enabled, upload the SSL CA root certificate.

    NOTE:
    • The maximum size of a single certificate file that can be uploaded is 500 KB.
    • If SSL is disabled, your data may be at risk.

    The IP address, domain name, username, and password of the source database are encrypted and stored in DRS, and will be cleared after the task is deleted.

    Table 7 Destination database settings

    Parameter

    Description

    IP Address or Domain Name

    The IP address or domain name of the destination database.

    Security Protocol

    Available options: PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL. For details, see Kafka Authentication.

  4. On the Set Synchronization Task page, select a topic and objects to be synchronized, and then click Next.

    Table 8 Synchronization mode and object

    Parameter

    Description

    Synchronize DDLs

    Controls whether to synchronize DDLs to Kafka. If Synchronize DDLs is enabled and Partitions are identified by the hash values of the primary key is selected, DDLs are hashed based on the table name because the DDLs do not have the primary key value. In other cases, the synchronization policy is the same as the partition policy.

    All Data

    Controls whether to synchronize all data in a single row. DRS parses the source database logs to synchronize incremental data. The data integrity in a single row depends on whether the values of all columns are recorded in the logs.

    If all data is required for the synchronization object, all-level supplemental logging must be enabled in the source database to record all column values of a single row data. This option is associated with the verification of the supplemental logging level in the source database in the pre-check phase. DRS incremental synchronization has the minimum requirement of table-level PK/UI supplemental logging. For details, see How Do I Check Supplemental Logging of the Source Oracle Database?

    Topic Synchronization Policy

    Topic synchronization policy. The options are as follows:

    • Select A specified topic if the data volume of the source database is small.
    • Select Automatically generated based on the schema name if each schema contains a lot of data.
    • Select Automatically generated using the schema_name-table_name format if each table contains a lot of data.

    Topic

    Select the topic to be synchronized to the destination database. This parameter is available when Topic Synchronization Policy is set to A specified topic.

    Topic Name Format

    Topic name format. This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Due to Kafka restrictions, a topic name can contain only ASCII characters, periods (.), underscores (_), and hyphens (-). If a topic name exceeds the limit, the topic fails to be created and the task is abnormal.

    If a topic name contains a database object name, ensure that the characters in the object name meet the Kafka topic naming requirements.

    The topic name format supports the schema and tablename variables. Other characters are used as constants. Replace $schema$ with the schema name and $tablename$ with the table name.

    For example, if this parameter is set to $schema$-$tablename$, the schema name is schema1, and the table name is tab1 when Oracle is the source, the topic name is schema1-tab1.

    Number of Partitions

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    The number of partitions of a topic. Each topic can have multiple partitions. More partitions can provide higher throughput but consume more resources. Set the number of partitions based on the actual situation of brokers.

    Replication Factor

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Number of copies of a topic. Each topic can have multiple copies, and the copies are placed on different brokers in a cluster. The number of copies cannot exceed the number of brokers. Otherwise, the topic fails to be created.

    Synchronize Topic To

    The policy for synchronizing topics to the Kafka partitions.

    • If topics are synchronized to different partitions by the hash values of schema_name.table_name, the performance on a single table query can be improved.
    • If topics are synchronized to different partitions by the hash values of the primary key, one table corresponds to one topic. This prevents data from being written to the same partition, and consumers can obtain data from different partitions concurrently.

      For a table without a primary key, if you select Partitions are identified by the hash values of the primary key, topics are synchronized to different partitions based on the hash values of schema_name.table_name.

    • Partitions are differentiated by the hash values of schema_name: This mode applies to scenarios where one database corresponds to one topic, preventing data in multiple schemas from being written to the same partition, so that consumers can obtain data from different partitions concurrently.
    • If topics are synchronized to partition 0, data is sent using multiple threads by default. This ensures strong consistency but write performance is impacted. If strong transaction consistency is required, you are advised to select this option and contact O&M personnel to change to single-thread Kafka write, or set the topic synchronization policy to Automatically generated based on the table name.

    Data Format in Kafka

    Select the format of data sent from the Oracle database to the Kafka.

    • Avro refers to binary encoded format.
    • Json refers to data interchange format.

    For details, see Kafka Message Format.

    Synchronization Object

    The left pane displays the source database objects, and the right pane displays the selected objects. You can select Tables or Import object file for Synchronization Object as required.

    • If you select Import object file for Synchronization Object, different tables can be synchronized to different topics at the destination end. For details about the import procedure and description, Importing Synchronization Objects.
    • When you select Import object file, you can use the mapping function in Mapping Object Names only when the topic synchronization policy is set to A specific topic. Otherwise, topics are generated based on the name format.
    NOTE:
    • To quickly select the desired database objects, you can use the search function.
    • If there are changes made to the source databases or objects, click in the upper right corner to update the objects to be synchronized.
    • If the object name contains spaces, the spaces before and after the object name are not displayed. If there are multiple spaces between the object name and the object name, only one space is displayed.
    • The name of the selected synchronization object cannot contain spaces.

  5. On the Check Task page, check the synchronization task.

    • If any check fails, review the cause and rectify the fault. After the fault is rectified, click Check Again.
    • If all check items are successful, click Next.

      You can proceed to the next step only when all checks are successful. If there are any items that require confirmation, view and confirm the details first before proceeding to the next step.

  6. On the displayed page, specify Start Time, confirm that the configured information is correct, and click Submit to submit the task.

    Table 9 Task startup settings

    Parameter

    Description

    Start Time

    Set Start Time to Start upon task creation or Start at a specified time based on site requirements.

    NOTE:

    After a synchronization task is started, the performance of the source and destination databases may be affected. You are advised to start a synchronization task during off-peak hours.

  7. After the task is submitted, you can view and manage it on the Data Synchronization Management page.

    • You can view the task status. For more information about task status, see Task Statuses.
    • You can click in the upper-right corner to view the latest task status.
    • By default, DRS retains any task in the Configuration state for three days. After three days, DRS automatically deletes background resources, but the task status remains unchanged. When you configure the task again, DRS applies for resources for the task again. In this case, the IP address of the DRS instance changes.