Updated on 2024-05-07 GMT+08:00

Example: Logical Replication Code

The example demonstrates how to use the logical replication function through the JDBC APIs. Before executing the example, load the driver. For details about how to obtain and load the driver, see JDBC Package, Driver Class, and Environment Class.

For details about the configuration options of logical replication, see section "Logical Replication > Logical Decoding" in Feature Guide. In addition, the following configuration items are added for streaming decoding tools such as JDBC:

  1. Decoding thread concurrency

    Set parallel-decode-num to specify the number of decoder threads for parallel decoding. The value is an integer ranging from 1 to 20. The value 1 indicates that decoding is performed based on the original serial logic. Other values indicate that parallel decoding is enabled. The default value is 1. When this parameter is set to 1, do not configure the following options: decode-style, sending-batch, and parallel-queue-size.

  2. Decoding format

    Configure decode-style to specify the decoding format. The value can be 'j', 't' or 'b' of the char type, indicating the JSON, text, or binary format, respectively. The default value is 'b', indicating binary decoding. This option is set only when parallel decoding is allowed and binary decoding is supported only in the parallel decoding scenario. For the JSON and text formats corresponding to the binary format, in the decoding result sent in batches, the uint32 consisting of the first four bytes of each decoding statement indicates the total number of bytes of the statement (the four bytes occupied by the uint32 are excluded, and 0 indicates that the decoding of this batch ends). The 8-byte uint64 indicates the corresponding LSN (begin corresponds to first_lsn, commit corresponds to end_lsn, and other values correspond to the LSN of the statement).

The binary encoding rules are as follows:

  1. The first four bytes represent the total number of bytes of the decoding result of statements following the statement-level delimiter letter P (excluded) or the batch end character F (excluded). If the value is 0, the decoding of this batch ends.
  2. The next eight bytes (uint64) indicate the corresponding LSN (begin corresponds to first_lsn, commit corresponds to end_lsn, and other values correspond to the LSN of the statement).
  3. The next one-byte letter can be B, C, I, U, or D, representing BEGIN, COMMIT, INSERT, UPDATE, or DELETE.
  4. If B is used in the step 3:
    1. The next eight bytes (uint64) indicate the CSN.
    2. The next eight bytes (uint64) indicate first_lsn.
    3. (Optional) If the next 1-byte letter is T, the following four bytes (uint32) indicate the timestamp length for committing the transaction. The following characters with the same length are the timestamp character string.
    4. (Optional) If the next one-byte letter is N, the following four bytes (uint32) indicate the length of the transaction username. The following characters with the same length are the transaction username.
    5. Because there may still be a decoding statement subsequently, a 1-byte letter P or F is used as a separator between statements. P indicates that there are still decoded statements in this batch, and F indicates that decoding in this batch is complete.
  5. If the letter described in 3 is C:
    1. (Optional) If the next 1-byte letter is X, the following eight bytes (uint64) indicate XID.
    2. (Optional) If the next 1-byte letter is T, the following four bytes (uint32) indicate the timestamp length. The following characters with the same length are the timestamp character string.
    3. When logs are sent in batches, decoding results of other transactions may still exist after a COMMIT log is decoded. If the next 1-byte letter is P, the batch still needs to be decoded. If the letter is F, the batch decoding ends.
  6. If the letter described in 3 is I, U, or D:
    1. The following two bytes (uint16) indicate the length of the schema name.
    2. The schema name is read based on the preceding length.
    3. The following two bytes (uint16) indicate the length of the table name.
    4. The table name is read based on the preceding length.
    5. (Optional) If the next 1-byte letter is N, it indicates a new tuple. If the letter is O, it indicates an old tuple. In this case, the new tuple is sent first.
      1. The following two bytes (uint16) indicate the number of columns to be decoded for the tuple, which is recorded as attrnum.
      2. The following procedure is repeated for attrnum times:
        1. The next two bytes (uint16) indicate the length of the column name.
        2. The column name is read based on the preceding length.
        3. The following four bytes (uint32) indicate the OID of the current column type.
        4. The next four bytes (uint32) indicate the length of the value (stored in the character string format) in the current column. If the value is 0xFFFFFFFF, it indicates null. If the value is 0, it indicates a character string whose length is 0.
        5. The column value is read based on the preceding length.
    6. Because there may still be a decoding statement after, if the next one-byte letter is P, it indicates that the batch still needs to be decoded, and if the next one-byte letter is F, it indicates that decoding of the batch ends.
  1. Decoding only on the standby node

    Configure the standby-connection option to specify whether to perform decoding only on the standby node. The value is of the Boolean type (0 or 1). The value true (or 1) indicates that only the standby node can be connected for decoding. When the primary node is connected for decoding, an error is reported and the system exits. The value false (or 0) indicates that there is no restriction. The default value is false (or 0).

  2. Batch sending

    Configure sending-batch to specify whether to send results in batches. The value is an integer ranging from 0 to 1. The value 0 indicates that decoding results are sent one by one. The value 1 indicates that decoding results are sent in batches when the accumulated size of decoding results reaches 1 MB. The default value is 0. This parameter can be set only during parallel decoding. In the scenario where batch sending is enabled, if the decoding format is 'j' or 't', before each original decoding statement, a uint32 type is added indicating the length of the decoding result (excluding the current uint32 type), and a uint64 type is added, indicating the LSN corresponding to the current decoding result.

  3. Length of the parallel decoding queue

    Configure parallel-queue-size to specify the length of the queue for interaction among parallel logical decoding threads. The value ranges from 2 to 1024 and must be a power of 2. The default value is 128. The queue length is positively correlated with the memory usage during decoding.

  4. Memory threshold for logical decoding

    The max-txn-in-memory configuration item specifies the memory threshold for caching the intermediate decoding result of a single transaction, in MB. The value ranges from 0 to 100 during serial decoding. The default value is 0, indicating that the memory usage is not controlled. The max-reorderbuffer-in-memory configuration item specifies the memory threshold for caching intermediate decoding results of all transactions, in GB. The value ranges from 0 to 100 during serial decoding. The default value is 0, indicating that the memory usage is not controlled. When the memory usage exceeds the threshold, intermediate decoding results are written into a temporary file during decoding, affecting the logical decoding performance.

  5. Logical decoding sending timeout threshold

    The sender-timeout configuration item specifies the heartbeat timeout threshold between the kernel and client. If no message is received from the client within the period, the logical decoding stops and disconnects from the client. The unit is ms, and the value range is [0,2147483647]. The default value depends on the value of the GUC parameter logical_sender_timeout.

  6. User blacklist options for logical decoding
    Use the user blacklist for logical decoding. The transaction operations of blacklisted users are filtered from the logical decoding result. The options are as follows:
    1. exclude-userids: specifies the OIDs of blacklisted users. Multiple OIDs are separated by commas (,). The system does not check whether the user OIDs exist.
    2. exclude-users: specifies blacklisted usernames. Multiple usernames are separated by commas (,). dynamic-resolution specifies whether to dynamically parse and identify usernames. If the decoding is interrupted because the user does not exist and the corresponding blacklisted user does not exist at the time when logs are generated, you can set dynamic-resolution to true or delete the username from the blacklist to start decoding and continue to obtain logical logs.
    3. dynamic-resolution: specifies whether to dynamically parse blacklisted usernames. The default value is true. If the parameter is set to false, an error is reported and the logical decoding exits when the decoding detects that the user does not exist in blacklist exclude-users. If the parameter is set to true, decoding continues when it detects that the user does not exist in blacklist exclude-users.
  7. Output options for transaction logical logs
    1. include-xids: specifies whether the BEGIN logical log of a transaction outputs the transaction ID. The default value is true.
    2. include-timestamp: specifies whether the BEGIN logical log of a transaction outputs the time when the transaction is committed. The default value is false.
    3. include-user: specifies whether the BEGIN logical log of a transaction outputs the username of the transaction. The default value is false. The username of a transaction refers to the authorized user, that is, the login user who executes the session corresponding to the transaction. The username does not change during the execution of the transaction.
  8. By default, socketTimeout of the logical decoding connection is set to 10s. When the primary node is overloaded during decoding on the standby node, the connection may be closed due to timeout. You can set withStatusInterval(10000,TimeUnit.MILLISECONDS) to adjust the timeout interval.
  9. Heartbeat log output option

    enable-heartbeat: specifies whether to generate heartbeat logs. The default value is false.

    If the heartbeat log output option is enabled, heartbeat logs will be generated. The heartbeat logs can be in the binary or TEXT/JSON format. For a binary heartbeat log message, it starts with a character 'h' and then the heartbeat log content: an 8-byte uint64 LSN indicating the end position of WAL reading when the heartbeat logical log is sent, an 8-byte uint64 LSN indicating the location of the WAL that has been flushed to disks when the heartbeat logical log is sent, and an 8-byte int64 timestamp (starting from January 1, 1970) indicating the timestamp when the latest decoded transaction log or checkpoint log is generated; then, it ends with character 'F'. TEXT/JSON heartbeat log messages that are sent in batches end with '0'. There is no such terminator for each TEXT/JSON heartbeat log message. The message content is transmitted in big-endian mode. The following figure shows the format.

The decoding performance (Xlog consumption) is greater than or equal to 100 Mbps in the following standard parallel decoding scenario: 16-core CPU, 128 GB memory, network bandwidth > 200 Mbps, 10 to 100 columns in a table, 0.1 KB to 1 KB data in a single row, INSERT as main DML operations, less than 4096 statements in a single transaction, parallel-decode-num set to 8, decoding format as 'b', and batch sending function enabled. To ensure that the decoding performance meets the requirements and minimize the impact on services, you are advised to set up only one parallel decoding connection on a standby node to ensure that the CPU, memory, and bandwidth resources are sufficient.

Note: The logical replication class PGReplicationStream is a non-thread-safe class. Concurrent calls may cause data exceptions.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// There will be security risks if the username and password used for authentication are directly written into code. It is recommended that the username and password be stored in the configuration file or environment variables (the password must be stored in ciphertext and decrypted when being used) to ensure security.
// In this example, the username and password are stored in environment variables. Before running this example, set environment variables EXAMPLE_USERNAME_ENV and EXAMPLE_PASSWORD_ENV in the local environment (set the environment variable names based on the actual situation).
// Logical replication function example: file name, LogicalReplicationDemo.java
// Prerequisite: The IP address of the JDBC user machine has been added to the database whitelist. The following content has been added to pg_hba.conf:
// Assume that the IP address of the JDBC user machine is 10.10.10.10.
//host    all             all             10.10.10.10/32        sha256
//host    replication     all             10.10.10.10/32        sha256

import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;

import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class LogicalReplicationDemo {
    private static PgConnection conn = null;
    public static void main(String[] args) {
        String driver = "org.postgresql.Driver";
 // Configure the IP address and haPort number of the database. By default, the port number is the port number of the connected DN plus 1.
        String sourceURL = "jdbc:postgresql://$ip:$port/postgres";    
 // The default name of the logical replication slot is replication_slot.
 // Test mode: Create a logical replication slot.
        int TEST_MODE_CREATE_SLOT = 1;
 // Test mode: Enable logical replication (the prerequisite is that the logical replication slot already exists).
        int TEST_MODE_START_REPL = 2;
 // Test mode: Delete a logical replication slot.
        int TEST_MODE_DROP_SLOT = 3;
        // Enable different test modes.
        int testMode = TEST_MODE_START_REPL;

        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        try {
            Properties properties = new Properties();
            PGProperty.USER.set(properties, System.getenv("EXAMPLE_USERNAME_ENV"));
            PGProperty.PASSWORD.set(properties, System.getenv("EXAMPLE_PASSWORD_ENV"));
     // For logical replication, the following three attributes are mandatory:
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            PGProperty.REPLICATION.set(properties, "database");
            PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
            conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
            System.out.println("connection success!");

            if(testMode == TEST_MODE_CREATE_SLOT){
                conn.getReplicationAPI()
                        .createReplicationSlot()
                        .logical()
                        .withSlotName("replication_slot") // If the character string contains uppercase letters, the uppercase letters are automatically converted to lowercase letters.
                        .withOutputPlugin("mppdb_decoding")
                        .make();
            }else if(testMode == TEST_MODE_START_REPL) {
                // Create a replication slot before enabling this mode.
                LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
                PGReplicationStream stream = conn
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("replication_slot")
                        .withSlotOption("include-xids", false)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStartPosition(waitLSN)
                        .withSlotOption("parallel-decode-num", 10) // Decoding thread concurrency
                        .withSlotOption("white-table-list", "public.t1,public.t2") // Whitelist
                        .withSlotOption("standby-connection", true) // Forcible standby decoding
                        .withSlotOption("decode-style", "t") // Decoding format
                        .withSlotOption("sending-batch", 0) // Sending decoding results in batches
                        .withSlotOption("max-txn-in-memory", 100) // The memory threshold for flushing a single decoding transaction to disks is 100 MB.
                        .withSlotOption("max-reorderbuffer-in-memory", 50) // The total memory threshold for flushing decoding transactions that are being handled to disks is 50 GB.
                        .withSlotOption("exclude-users", 'userA') // The logical log of the transaction executed by user A is not returned.
                        .withSlotOption("include-user", true) // The BEGIN logical log of the transaction contains the username.
                        .withSlotOption("enable-heartbeat", true) // Enable the heartbeat log output option.
                        .start();
                while (true) {
                    ByteBuffer byteBuffer = stream.readPending();

                    if (byteBuffer == null) {
                        TimeUnit.MILLISECONDS.sleep(10L);
                        continue;
                    }

                    int offset = byteBuffer.arrayOffset();
                    byte[] source = byteBuffer.array();
                    int length = source.length - offset;
                    System.out.println(new String(source, offset, length));

                    // If the LSN needs to be flushed, call the following APIs based on the service requirements:
                    //LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
                    //stream.setFlushedLSN(lastRecv);
                    //stream.forceUpdateStatus();

                }
            }else if(testMode == TEST_MODE_DROP_SLOT){
                conn.getReplicationAPI()
                        .dropReplicationSlot("replication_slot");
            }
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
    }
}
Example of the decoding result in text format (that is, 't' format):
BEGIN CSN: 2014 first_lsn: 0/2816A28
table public t1 INSERT: a[integer]:1 b[integer]:2 c[text]:'hello'
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
table public t1 UPDATE: old-key: a[integer]:1 new-tuple: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
table public t1 DELETE: a[integer]:1
COMMIT XID: 15506
Example of the decoding result in JSON format (that is, 'j' format):
BEGIN CSN: 2014 first_lsn: 0/2816A28
{"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","1","'hello'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
{"table_name":"public.t1","op_type":"UPDATE","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","5","'hello'"],"old_keys_name":["a"],"old_keys_type":["integer"],"old_keys_val":["1"]}
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
{"table_name":"public.t1","op_type":"DELETE","columns_name":[],"columns_type":[],"columns_val":[],"old_keys_name":["a"],"old_keys_type":["integer"],"old_keys_val":["1"]}
COMMIT XID: 15506