Updated on 2024-12-01 GMT+08:00

From PostgreSQL to Kafka

Supported Source and Destination Databases

Table 1 Supported databases

Source DB

Destination DB

  • On-premises database (PostgreSQL 9.4, 9.5, 9.6, 10, 11, 12, 13 and 14)
  • ECS database (PostgreSQL 9.4, 9.5, 9.6, 10, 11, 12, 13 and 14)
  • Kafka

    Version 0.11 or later

Supported Synchronization Objects

Table 2 lists the objects that can be synchronized in different scenarios. DRS will automatically check the objects you selected before the synchronization.

Table 2 Supported synchronization objects

Type

Precautions

Objects

  • Instance-level synchronization is not supported. Only one database can be synchronized at a time. Multiple DRS tasks are required to synchronize multiple databases.
  • Supported field types:

    Digit, currency, character, binary, date/time, boolean, enumeration, geometry, network address, bit, text search, UUID, XML, JSON, array, compound, and range.

    NOTE:

    The restrictions on synchronization object names are as follows:

    The database name cannot contain +"%'\<>, the schema name and table name cannot contain ".'<>, and the column name cannot contain double quotation marks (") and single quotation marks (').

  • Scope of incremental synchronization
    • Some DML statements, including INSERT, UPDATE, and DELETE, can be synchronized.
    • Not supported: DDL statements, DML statements of unlogged tables and temporary tables
    NOTE:

    Incremental heartbeat information sending: For PostgreSQL 9.6 and later versions, if no data is written to the source database for a long time (more than 10 hours), the log extraction process invokes pg_logical_emit_message to insert heartbeat information into WAL logs of the source database, which ensures that the logical replication slot number is updated normally. (Only WAL logs are added, and services are not affected.)

Database Account Permission Requirements

To start a synchronization task, the source and destination database users must meet the requirements in the following table. Different types of synchronization tasks require different permissions. For details, see Table 3. DRS automatically checks the database account permissions in the pre-check phase and provides handling suggestions.

  • You are advised to create an independent database account for DRS task connection to prevent task failures caused by database account password modification.
  • After changing the account passwords for the source and destination databases, modify the connection information of the DRS task by referring to Modifying Connection Information to prevent automatic retry after a task failure. Automatic retry will lock the database accounts.
Table 3 Database account permission

Type

Incremental Synchronization

Source database user

The CONNECT permission for databases, USAGE permission for schemas, SELECT permission for tables, and the permission to create replication connections

Suggestions

  • The success of database synchronization depends on environment and manual operations. To ensure a smooth synchronization, perform a synchronization trial before you start the synchronization to help you detect and resolve problems in advance.
  • It is recommended that you start a task during off-peak hours to minimize the impact of synchronization on your services.
  • For more information about the impact of DRS on databases, see How Does DRS Affect the Source and Destination Databases?

Precautions

DRS incremental synchronization consists of three phases: task start, incremental synchronization, and task completion. To ensure smooth synchronization, read the following notes before creating a synchronization task.

Table 4 Precautions

Type

Constraints

Starting a task

  • Source database requirements:

    The wal_level value of the source database must be logical.

    The test_decoding plug-in has been installed on the source database.

    The replica identity attribute of tables that do not have primary keys in the source database must be full.

    The max_replication_slots value of the source database must be greater than the number of used replication slots.

    The max_wal_senders value of the source database must be greater than or equal to the max_replication_slots value.

    If the toast attribute of the primary key column in the source database is main, external, or extended, the replica identity attribute must be full.

  • Destination database requirements:
    • The destination database is a Kafka database.
    • You are advised to set auto.create.topics.enable of Kafka to false.
  • Other notes:
    • If the DCC does not support instances with 4 vCPUs and 8 GB memory or higher instance specifications, the synchronization task cannot be created.
    • During database-level synchronization, incremental synchronization tasks cannot be edited.
    • During table-level synchronization, incremental synchronization tasks can be edited, but the database cannot be changed.
    • Before starting a synchronization task, ensure that no long transaction is started in the source database. Otherwise, the creation of the logical replication slot will be blocked, leading the task to fail.
    • After a task is started, a primary/standby switchover can be performed only on the source database of the following versions: RDS for PostgreSQL 12.6 or later and RDS for PostgreSQL 13 or later.
    • If a logical replication slot fails to be created or does not exist due to a long transaction, you can reset the task and then restart it.

Incremental synchronization

  • Do not change the port of the source and destination databases, or change or delete the passwords and permissions of the source and destination database users. Otherwise, the task may fail.
  • Do not delete the primary key of the source database table. Otherwise, incremental data may be lost or the task may fail.
  • Do not modify the replica identity attribute of tables in the source database. Otherwise, incremental data may be lost or the task may fail.
  • Do not delete the topic for receiving DRS data in Kafka. Otherwise, the task may fail.
  • During database-level synchronization, if a table without a primary key is added to the source database, you must set replica identity of the table to full before writing data. Otherwise, incremental data may be lost or the task may fail.
  • During database-level synchronization, if a primary key table is added to the source database and the toast attribute of the primary key column is main, external, or extended, the replica identity attribute of the table must be set to full before writing data. Otherwise, data may be inconsistent or the task may fail.
  • If an incremental synchronization task is suspended or resumed due to an exception, there may be duplicate data in the destination Kafka. Use the id field in the Kafka data for data deduplication.

Stopping a task

  • Stop a task normally:
    • When a full+incremental synchronization task is complete, the streaming replication slot created by the task in the source database is automatically deleted.
  • Forcibly stop a task:
    • To forcibly stop a full+incremental real-time synchronization task, you need to manually delete the replication slots that may remain in the source database. For details, see Forcibly Stopping Synchronization of PostgreSQL.
    • The naming rule of a logic replication slot is drs_unique_ID. To obtain the unique ID, replace the hyphen (-) in the task node ID with an underscore (_). You can find the node ID in the task node id is *** log on the Synchronization Logs page.

Prerequisites

  • You have logged in to the DRS console.
  • Your account balance is greater than or equal to $0 USD.
  • For details about the DB types and versions supported by real-time synchronization, see Supported Databases.
  • If a subaccount is used to create a DRS task, ensure that an agency has been added. To create an agency, see Agency Management.

Procedure

  1. On the Data Synchronization Management page, click Create Synchronization Task.
  2. On the Create Synchronization Instance page, select a region and project, specify the task name, description, and the synchronization instance details, and click Create Now.

    • Task information description
      Figure 1 Synchronization task information
      Table 5 Task information

      Parameter

      Description

      Region

      The region where the replication instance is deployed. You can change the region.

      Project

      The project corresponds to the current region and can be changed.

      Task Name

      The task name must start with a letter and consist of 4 to 50 characters. It can contain only letters, digits, hyphens (-), and underscores (_).

      Description

      The description consists of a maximum of 256 characters and cannot contain special characters !=<>'&"\

    • Synchronization instance details
      Figure 2 Synchronization instance details
      Table 6 Synchronization instance settings

      Parameter

      Description

      Data Flow

      Choose Self-built to self-built.

      Source DB Engine

      Select PostgreSQL.

      Destination DB Engine

      Select Kafka.

      Network Type

      The public network is used as an example. Available options: Public network and VPN or Direct Connect

      VPC

      Select an available VPC.

      Synchronization Instance Subnet

      Select the subnet where the synchronization instance is located. You can also click View Subnets to go to the network console to view the subnet where the instance resides.

      By default, the DRS instance and the destination DB instance are in the same subnet. You need to select the subnet where the DRS instance resides, and there are available IP addresses for the subnet. To ensure that the synchronization instance is successfully created, only subnets with DHCP enabled are displayed.

      Security Group

      Select a security group. You can use security group rules to allow or deny access to the instance.

      Synchronization Mode

      • Incremental

        Through log parsing, incremental data generated on the source database is synchronized to the destination database.

        During synchronization, the source database continues to provide services for external systems with zero downtime.

      Specify EIP

      This parameter is available when you select Public network for Network Type. Select an EIP to be bound to the DRS instance. DRS will automatically bind the specified EIP to the DRS instance and unbind the EIP after the task is complete. The number of specified EIPs must be the consistent with that of DB instances.

      For details about the data transfer fee generated using a public network, see EIP Price Calculator.

    • Task Type
      Figure 3 Task type
      Table 7 Task type information

      Parameter

      Description

      Specifications

      DRS instance specifications. Different specifications have different performance upper limits. For details, see Real-Time Synchronization.

      NOTE:

      DRS allows you to upgrade specifications only for single-AZ synchronization tasks. Task specifications cannot be downgraded. For details, see Changing Specifications.

      AZ

      Select the AZ where you want to create the DRS task. Selecting the one housing the source or destination database can provide better performance.

    • Enterprise Project and Tags
      Figure 4 Enterprise Project and Tags

      Table 8 Enterprise Project and Tags

      Parameter

      Description

      Enterprise Project

      An enterprise project you would like to use to centrally manage your cloud resources and members. Select an enterprise project from the drop-down list. The default project is default.

      For more information about enterprise project, see Enterprise Management User Guide.

      To customize an enterprise project, click Enterprise in the upper right corner of the console. The Enterprise Project Management Service page is displayed. For details, see Creating an Enterprise Project in Enterprise Management User Guide.

      Tags

      • Tags a task. This configuration is optional. Adding tags helps you better identify and manage your tasks. Each task can have up to 20 tags.
      • If your organization has configured tag policies for DRS, add tags to tasks based on the policies. If a tag does not comply with the policies, task creation may fail. Contact your organization administrator to learn more about tag policies.
      • After a task is created, you can view its tag details on the Tags tab. For details, see Tag Management.

    If a task fails to be created, DRS retains the task for three days by default. After three days, the task automatically stops.

  3. On the Configure Source and Destination Databases page, wait until the synchronization instance is created. Then, specify source and destination database information and click Test Connection for both the source and destination databases to check whether they have been connected to the synchronization instance. After the connection tests are successful, click Next.

    Establish the connectivity between the DRS instance and the source and destination databases.

    • Network connectivity: Ensure that the source and destination databases accept connections from the DRS instance. To access databases over a public network, configure the database to accept connections from the EIP of the DRS instance. To access databases over a VPC, VPN, or Direct Connect network, configure the database to accept connections from the private IP address of the DRS instance. For details, see Network Preparations.
    • Account connectivity: Ensure that the source and destination databases allows connections from the DRS instance using the username and password.
    Figure 5 Source database information
    Table 9 Source database settings

    Parameter

    Description

    IP Address or Domain Name

    The IP address or domain name of the source database.

    Port

    The port of the source database. Range: 1 – 65535

    Database Name

    Indicates whether to specify a database. If this option is enabled, enter the database name.

    Database Username

    The username for accessing the source database.

    Database Password

    The password for the database username.

    SSL Connection

    SSL encrypts the connections between the source and destination databases.

    The username and password of the source database are encrypted and stored in DRS and will be cleared after the task is deleted.

    Figure 6 Destination database information
    Table 10 Destination database settings

    Parameter

    Description

    IP Address or Domain Name

    IP address or domain name of the destination database in the IP address/Domain name:Port format. The port of the destination database. Range: 1 - 65535

    You can enter up to 10 groups of IP addresses or domain names of the destination database. Separate multiple values with commas (,). For example: 192.168.0.1:8080,192.168.0.2:8080.

    Method

    Available options: PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL. For details, see Kafka Authentication.

  4. On the Set Synchronization Task page, select the synchronization policy, objects, and data format, and click Next.

    Figure 7 Synchronization mode

    Table 11 Synchronization object

    Parameter

    Description

    Synchronize DML

    Select the DML operations to be synchronized. By default, all DML operations are selected.

    If you do not select Delete, DELETE statements in the incremental data of the source database will not be synchronized, which may cause a data inconsistency. As a result, there may be a data conflict or the task may fail.

    Source Database Replication Slot Name

    You can choose whether to specify the replication slot of the source database. After replication slot is enabled, enter the replication slot name. The name contains 63 characters and cannot start with a digit. Only lowercase letters, digits, and underscores (_) are allowed.

    Topic Synchronization Policy

    Topic synchronization policy. You can select A specific topic or Auto-generated topics.

    Topic

    Select the topic to be synchronized to the destination database. This parameter is available when the topic is set to A specified topic.

    Topic Name Format

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Due to Kafka restrictions, a topic name can contain only ASCII characters, periods (.), underscores (_), and hyphens (-). If a topic name exceeds the limit, the topic fails to be created and the task is abnormal.

    If a topic name contains a database object name, ensure that the characters in the object name meet the Kafka topic naming requirements.

    Number of Partitions

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    The number of partitions of a topic. Each topic can have multiple partitions. More partitions can provide higher throughput but consume more resources. Set the number of partitions based on the actual situation of brokers.

    Replication Factor

    This parameter is available when Topic Synchronization Policy is set to Auto-generated topics.

    Number of copies of a topic. Each topic can have multiple copies, and the copies are placed on different brokers in a cluster. The number of copies cannot exceed the number of brokers. Otherwise, the topic fails to be created.

    Synchronize Topic To

    The policy for synchronizing topics to the Kafka partitions.

    • If topics are synchronized to different partitions by hash value of the database, schema and table names, the performance on a single table query can be improved.
    • If topics are synchronized to different partitions by hash value of the primary key, one table corresponds to one topic. This prevents data from being written to the same partition, and consumers can obtain data from different partitions concurrently.

      For a table without a primary key, if you select Partitions are identified by the hash values of the primary key, topics are synchronized to different partitions based on the hash value of the database_name.schema.table_name.

    • Partitions are differentiated by the hash values of database_name.schema_name: This mode applies to scenarios where one database corresponds to one topic, preventing multiple schemas from being written to the same partition, so that consumers can obtain data from different partitions concurrently.
    • If topics are synchronized to partition 0, strong consistency can be obtained but write performance is impacted.

    Data Format in Kafka

    Select the data format to be delivered to Kafka.

    • Avro: A binary encoded format that is efficient. You need to deserialize the data later.
    • JSON: JSON message format, which is easy to interpret but takes up more space.

    For details, see Kafka Message Format.

    Synchronization Object

    The left pane displays the source database objects, and the right pane displays the selected objects. DRS supports table- and database-level synchronization. You can select data for synchronization based on your service requirements.

    NOTE:
    • To quickly select the desired database objects, you can use the search function.
    • If there are changes made to the source databases or objects, click in the upper right corner to update the objects to be synchronized.
    • If an object name contains spaces, the spaces before and after the object name are not displayed. If there are two or more consecutive spaces in the middle of the object name, only one space is displayed.
    • The name of the selected synchronization object cannot contain spaces.

  5. On the Check Task page, check the synchronization task.

    • If any check fails, review the cause and rectify the fault. After the fault is rectified, click Check Again.

      For details about how to handle check failures, see Solutions to Failed Check Items in Data Replication Service User Guide.

    • If all check items are successful, click Next.

      You can proceed to the next step only when all checks are successful. If there are any items that require confirmation, view and confirm the details first before proceeding to the next step.

  6. On the displayed page, specify Start Time, Send Notifications, SMN Topic, Delay Threshold (s), and Stop Abnormal Tasks After, confirm that the configured information is correct, select the check box before the agreement, and click Submit to submit the task.

    Figure 8 Task startup settings

    Table 12 Task startup settings

    Parameter

    Description

    Start Time

    Set Start Time to Start upon task creation or Start at a specified time based on site requirements.

    NOTE:

    After a synchronization task is started, the performance of the source and destination databases may be affected. You are advised to start a synchronization task during off-peak hours.

    Send Notifications

    This parameter is optional. After enabled, select a SMN topic. If the status, latency metric, or data of the migration task is abnormal, DRS will send you a notification.

    SMN Topic

    This parameter is available only after you enable Send Notifications and create a topic on the SMN console and add a subscriber.

    For details, see Simple Message Notification User Guide.

    Delay Threshold (s)

    During an incremental synchronization, a synchronization delay indicates a time difference (in seconds) of synchronization between the source and destination database.

    If the synchronization delay exceeds the threshold you specify, DRS will send alarms to the specified recipients. The value ranges from 0 to 3,600. To avoid repeated alarms caused by the fluctuation of delay, an alarm is sent only after the delay has exceeded the threshold for six minutes.

    NOTE:
    • If the delay threshold is set to 0, no notifications will be sent to the recipient.
    • In the early stages of an incremental synchronization, the synchronization delay is long because a large quantity of data is awaiting synchronization. In this case, no notifications will be sent.
    • Before setting the delay threshold, enable Send Notifications.

    Data Exception Notification

    This parameter is optional. After enabled, DRS will send a notification if the task data is abnormal.

    Stop Abnormal Tasks After

    Number of days after which an abnormal task is automatically stopped. The value must range from 14 to 100. The default value is 14.

    NOTE:
    • You can set this parameter only for pay-per-use tasks.
    • Tasks in the abnormal state are still charged. If tasks remain in the abnormal state for a long time, they cannot be resumed. Abnormal tasks run longer than the period you set (unit: day) will automatically stop to avoid unnecessary fees.

  7. After the task is submitted, you can view and manage it on the Data Synchronization Management page.

    • You can view the task status. For more information about task status, see Task Statuses.
    • You can click in the upper right corner to view the latest task status.
    • By default, DRS retains a task in the Configuration state for three days. After three days, DRS automatically deletes background resources, but the task status remains unchanged. When you configure the task again, DRS applies for resources for the task again. In this case, the IP address of the DRS instance changes.
    • For a public network task, DRS needs to delete background resources after you stop the task. The EIP bound to the task cannot be restored to the Unbound state until background resources are deleted.