Updated on 2024-08-20 GMT+08:00

Example: Logic Replication Code

The example demonstrates how to use the logical replication function through the JDBC APIs. Before executing the example, load the driver. For details about how to obtain and load the driver, see JDBC Packages, Driver Classes, and Environment Classes.

For details about the configuration options of logical replication, see section "Logical Replication > Logical Decoding" in Feature Guide. In addition, the following configuration items are added for streaming decoding tools such as JDBC:

  1. Decoding thread concurrency

    Set parallel-decode-num to specify the number of decoder threads for parallel decoding. The value is an integer ranging from 1 to 20. The value 1 indicates that decoding is performed based on the original serial logic. Other values indicate that parallel decoding is enabled. The default value is 1. When this parameter is set to 1, do not configure the following options: decode-style, sending-batch, and parallel-queue-size.

  2. Decoding format

    Configure decode-style to specify the decoding format. The value can be 'j', 't' or 'b' of the char type, indicating the JSON, text, or binary format, respectively. The default value is 'b', indicating binary decoding. This option is set only when parallel decoding is allowed and binary decoding is supported only in the parallel decoding scenario. For the JSON and text formats corresponding to the binary format, in the decoding result sent in batches, the uint32 consisting of the first four bytes of each decoding statement indicates the total number of bytes of the statement (the four bytes occupied by the uint32 are excluded, and 0 indicates that the decoding of this batch ends). The 8-byte uint64 indicates the corresponding LSN (begin corresponds to first_lsn, commit corresponds to end_lsn, and other values correspond to the LSN of the statement).

    The binary encoding rules are as follows:

    1. The first four bytes represent the total number of bytes of the decoding result of statements following the statement-level delimiter letter P (excluded) or the batch end character F (excluded). If the value is 0, the decoding of this batch ends.
    2. The next eight bytes (uint64) indicate the corresponding LSN (begin corresponds to first_lsn, commit corresponds to end_lsn, and other values correspond to the LSN of the statement).
    3. The next one-byte letter can be B, C, I, U, or D, representing BEGIN, COMMIT, INSERT, UPDATE, or DELETE.
    4. If B is used in the step 3:
      1. The next eight bytes (uint64) indicate the CSN.
      2. The next eight bytes (uint64) indicate first_lsn.
      3. (Optional) If the next one-byte letter is T, the following four bytes (uint32) indicate the timestamp length for committing the transaction. The following characters with the same length are the timestamp character string.
      4. (Optional) If the next one-byte letter is N, the following four bytes (uint32) indicate the length of the transaction username. The following characters with the same length are the transaction username.
    5. If C is used in the step 3:
      1. (Optional) If the next one-byte letter is X, the following eight bytes (uint64) indicate the XID.
      2. (Optional) If the next one-byte letter is T, the following four bytes (uint32) indicate the timestamp length. The following characters with the same length are the timestamp character string.
      3. When logs are sent in batches, decoding results of other transactions may still exist after a COMMIT log is decoded. If the next 1-byte letter is P, the batch still needs to be decoded. If the letter is F, the batch decoding ends.
    6. If I, U, or D is used in the step 3:
      1. The following two bytes (uint16) indicate the length of the schema name.
      2. The schema name is read based on the preceding length.
      3. The following two bytes (uint16) indicate the length of the table name.
      4. The table name is read based on the preceding length.
      5. (Optional) If the next one-byte letter is N, it indicates a new tuple. If the letter is O, it indicates an old tuple. In this case, the new tuple is sent first.
        1. The following two bytes (uint16) indicate the number of columns to be decoded for the tuple, which is recorded as attrnum.
        2. The following procedure is repeated for attrnum times.
          1. The next two bytes (uint16) indicate the length of the column name.
          2. The column name is read based on the preceding length.
          3. The following four bytes (uint32) indicate the OID of the current column type.
          4. The next four bytes (uint32) indicate the length of the value (stored in the character string format) in the current column. If the value is 0xFFFFFFFF, it indicates null. If the value is 0, it indicates a character string whose length is 0.
          5. The column value is read based on the preceding length.
      6. Because there may still be a decoding statement after, if the next one-byte letter is P, it indicates that the batch still needs to be decoded, and if the next one-byte letter is F, it indicates that decoding of the batch ends.
  3. Decoding only on the standby node

    Configure the standby-connection option to specify whether to perform decoding only on the standby node. The value is of the Boolean type (0 or 1). The value true (or 1) indicates that only the standby node can be connected for decoding. When the primary node is connected for decoding, an error is reported and the system exits. The value false (or 0) indicates that there is no restriction. The default value is false (or 0).

  4. Batch sending

    Configure sending-batch to specify whether to send results in batches. The value is an integer ranging from 0 to 1. The value 0 indicates that decoding results are sent one by one. The value 1 indicates that decoding results are sent in batches when the accumulated size of decoding results reaches 1 MB. The default value is 0. This parameter can be set only during parallel decoding. In the scenario where batch sending is enabled, if the decoding format is 'j' or 't', before each original decoding statement, a uint32 type is added indicating the length of the decoding result (excluding the current uint32 type), and a uint64 type is added, indicating the LSN corresponding to the current decoding result.

  5. Length of the parallel decoding queue

    Configure parallel-queue-size to specify the length of the queue for interaction among parallel logical decoding threads. The value ranges from 2 to 1024 and must be a power of 2. The default value is 128. The queue length is positively correlated with the memory usage during decoding.

  6. Memory threshold for logical decoding

    The max-txn-in-memory configuration item specifies the memory threshold for caching the intermediate decoding result of a single transaction, in MB. The value range is [0,100] for serial decoding. The default value is 0, indicating that the memory usage is not controlled. For parallel decoding, the value range is [0,max_process_memory x 25%]. The default value is max_process_memory/4/1024, where 1024 indicates the conversion from KB to MB. The value 0 indicates that this memory control is disabled. The max-reorderbuffer-in-memory configuration item specifies the memory threshold for caching intermediate decoding results of all transactions, in GB. The value range is [0,100] for serial decoding. The default value is 0, indicating that the memory usage is not controlled. For parallel decoding, the value range is [0, max_process_memory x 50%]. The default value is max_process_memory/2/1048576, where 1048576 indicates the conversion from KB to GB. The value 0 indicates that this memory control is disabled. When the memory usage exceeds the threshold, intermediate decoding results are written into a temporary file during decoding, affecting the logical decoding performance.

  7. Logical decoding sending timeout threshold

    The sender-timeout configuration item specifies the heartbeat timeout threshold between the kernel and client. If no message is received from the client within the period, the logical decoding stops and disconnects from the client. The unit is ms, and the value range is [0,2147483647]. The default value depends on the value of the GUC parameter logical_sender_timeout.

  8. User blacklist options for logical decoding

    Use the user blacklist for logical decoding. The transaction operations of blacklisted users are filtered from the logical decoding result. The options are as follows:

    1. exclude-userids: specifies the OIDs of blacklisted users. Multiple OIDs are separated by commas (,). The system does not check whether the user OIDs exist. The OIDs of the same service user on different DNs may be different. Therefore, the OID of the service user on each DN needs to be transferred for logical decoding of directly connected DNs in distributed mode. Otherwise, the logical decoding results of some DNs may be filtered while those of some DNs are not filtered.
    2. exclude-users: specifies blacklisted usernames. Multiple usernames are separated by commas (,). dynamic-resolution specifies whether to dynamically parse and identify usernames. If the decoding is interrupted because the user does not exist and the corresponding blacklisted user does not exist at the time when logs are generated, you can set dynamic-resolution to true or delete the username from the blacklist to start decoding and continue to obtain logical logs.
    3. dynamic-resolution: specifies whether to dynamically parse blacklisted usernames. The default value is true. If the parameter is set to false, an error is reported and the logical decoding exits when the decoding detects that the user does not exist in blacklist exclude-users. If the parameter is set to true, decoding continues when it detects that the user does not exist in blacklist exclude-users.
  9. Output options for transaction logic logs
    1. include-xids: specifies whether the BEGIN logical log of a transaction outputs the transaction ID. The default value is true.
    2. include-timestamp: specifies whether the BEGIN logical log of a transaction outputs the time when the transaction is committed. The default value is false.
    3. include-user: specifies whether the BEGIN logical log of a transaction outputs the username of the transaction. The default value is false. The username of a transaction refers to the authorized user, that is, the login user who executes the session corresponding to the transaction. The username does not change during the execution of the transaction.
  10. By default, socketTimeout of the logical decoding connection is set to 10s. When the primary node is overloaded during decoding on the standby node, the connection may be closed due to timeout. You can set withStatusInterval(10000,TimeUnit.MILLISECONDS) to adjust the timeout interval.
  11. Heartbeat log output option

    enable-heartbeat: specifies whether to generate heartbeat logs. The default value is false.

    If the heartbeat log output option is enabled, heartbeat logs will be generated. The heartbeat logs can be parsed as follows: For a binary heartbeat log message, it starts with a character 'h' and then the heartbeat log content: an 8-byte uint64 string, an 8-byte uint64 string, and an 8-byte int64 string. For the first 8-byte uint64 string, in the decoding scenario where DNs are directly connected, this string is an LSN, indicating the end position of the WAL read when the heartbeat logical log is sent; in the decoding scenario where distributed strong consistency is required, this string is a CSN, indicating the decoding log transaction CSN that has been sent when the heartbeat logical log is sent. For the second 8-byte uint64 string, in the decoding scenario where DNs are directly connected, this string is an LSN, indicating the location of the WAL that has been flushed to disks when the heartbeat logical log is sent; in the decoding scenario where distributed strong consistency is required, this string is a CSN, indicating the CSN to be obtained by the next transaction committed by the cluster. The last 8-byte int64 string indicates the generation timestamp (starting from January 1, 1970) of the latest decoded transaction log or checkpoint log. Then, it ends with character 'F'. TEXT/JSON heartbeat log messages that are sent in batches end with '0'. There is no such terminator for each TEXT/JSON heartbeat log message. The message content is transmitted in big-endian mode. The following figure shows the format. (In consideration of forward compatibility, the LSN naming mode is retained. The actual meaning depends on the specific scenario.)

The decoding performance (Xlog consumption) is greater than or equal to 100 Mbps in the following standard parallel decoding scenario: 16-core CPU, 128 GB memory, network bandwidth > 200 Mbps, 10 to 100 columns in a table, 0.1 KB to 1 KB data in a single row, INSERT as main DML operations, less than 4096 statements in a single transaction, parallel-decode-num set to 8, decoding format as 't', and batch sending function enabled. To ensure that the decoding performance meets the requirements and minimize the impact on services, you are advised to set up only one parallel decoding connection on a standby node to ensure that the CPU, memory, and bandwidth resources are sufficient.

The logical replication class PGReplicationStream is a non-thread-safe class. Concurrent calls may cause data exceptions.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// gaussdbjdbc.jar is used as an example.
// There will be security risks if the username and password used for authentication are directly written into code. It is recommended that the username and password be stored in the configuration file or environment variables (the password must be stored in ciphertext and decrypted when being used) to ensure security.
// In this example, the username and password are stored in environment variables. Before running this example, set environment variables EXAMPLE_USERNAME_ENV and EXAMPLE_PASSWORD_ENV in the local environment (set the environment variable names based on the actual situation).
// Logical replication function example: file name, LogicalReplicationDemo.java
// Prerequisite: Add the IP address of the JDBC user machine to the database whitelist. Add the following content to gs_hba.conf:
// Assume that the IP address of the JDBC user machine is 10.10.10.10.
//host    all             all             10.10.10.10/32        sha256
//host    replication     all             10.10.10.10/32        sha256

import com.huawei.gaussdb.jdbc.PGProperty;
import com.huawei.gaussdb.jdbc.jdbc.PgConnection;
import com.huawei.gaussdb.jdbc.replication.LogSequenceNumber;
import com.huawei.gaussdb.jdbc.replication.PGReplicationStream;

import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class LogicalReplicationDemo {
    private static PgConnection conn = null;
    public static void main(String[] args) {
        String driver = "com.huawei.gaussdb.jdbc.Driver";
        // Configure the IP address and haPort number of the database. By default, the port number is the port number of the connected DN plus 1.
        String sourceURL = "jdbc:gaussdb://$ip:$port/database";
        
        // The default name of the logical replication slot is replication_slot.
        // Test mode: Create a logical replication slot.
        int TEST_MODE_CREATE_SLOT = 1;
        // Test mode: Enable logical replication (the prerequisite is that the logical replication slot already exists).
        int TEST_MODE_START_REPL = 2;
        // Test mode: Delete a logical replication slot.
        int TEST_MODE_DROP_SLOT = 3;
        // Enable different test modes.
        int testMode = TEST_MODE_START_REPL;

        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        try {
            Properties properties = new Properties();
            PGProperty.USER.set(properties, System.getenv("EXAMPLE_USERNAME_ENV"));
            PGProperty.PASSWORD.set(properties, System.getenv("EXAMPLE_PASSWORD_ENV"));
     // For logical replication, the following three attributes are required:
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            PGProperty.REPLICATION.set(properties, "database");
            PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
            conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
            System.out.println("connection success!");

            if(testMode == TEST_MODE_CREATE_SLOT){
                conn.getReplicationAPI()
                        .createReplicationSlot()
                        .logical()
                        .withSlotName("replication_slot") // If the character string contains uppercase letters, the uppercase letters are automatically converted to lowercase letters.
                        .withOutputPlugin("test_decoding")
                        .make();
            }else if(testMode == TEST_MODE_START_REPL) {
                // Create a replication slot before enabling this mode.
                LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
                PGReplicationStream stream = conn
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("replication_slot")
                        .withSlotOption("include-xids", false)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStartPosition(waitLSN)
                        .withSlotOption("parallel-decode-num", 10) // Decoding thread concurrency
                        .withSlotOption("white-table-list", "public.t1,public.t2") // Whitelist
                        .withSlotOption("standby-connection", true) // Forcible standby decoding
                        .withSlotOption("decode-style", "t") // Decoding format
                        .withSlotOption("sending-batch", 0) // Sending decoding results in batches
                        .withSlotOption("max-txn-in-memory", 100) // The memory threshold for flushing a single decoding transaction to disks is 100 MB.
                        .withSlotOption("max-reorderbuffer-in-memory", 2) // The total memory threshold for flushing decoding transactions that are being handled to disks is 2 GB.
                        .withSlotOption("exclude-users", "userA") // The logical log of the transaction executed by user A is not returned.
                        .withSlotOption("include-user", true) // The BEGIN logical log of the transaction contains the username.
                        .withSlotOption("enable-heartbeat", true) // Enable the heartbeat log output option.
                        .start();
                while (true) {
                    ByteBuffer byteBuffer = stream.readPending();

                    if (byteBuffer == null) {
                        TimeUnit.MILLISECONDS.sleep(10L);
                        continue;
                    }

                    int offset = byteBuffer.arrayOffset();
                    byte[] source = byteBuffer.array();
                    int length = source.length - offset;
                    System.out.println(new String(source, offset, length));

                    // If the LSN needs to be flushed, call the following APIs based on the service requirements:
//                    LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
//                    stream.setFlushedLSN(lastRecv);
//                    stream.forceUpdateStatus();

                }
            }else if(testMode == TEST_MODE_DROP_SLOT){
                conn.getReplicationAPI()
                        .dropReplicationSlot("replication_slot");
            }
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
    }
}
// Note: The preceding code cannot directly read the logical logs in binary format. You need to read the logical logs according to the binary encoding rules.
The following is an example of the decoding result in text format (that is, 't' format):
BEGIN CSN: 2014 first_lsn: 0/2816A28
table public t1 INSERT: a[integer]:1 b[integer]:2 c[text]:'hello'
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
table public t1 UPDATE: old-key: a[integer]:1 new-tuple: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
table public t1 DELETE: a[integer]:1
COMMIT XID: 15506
The following is an example of the decoding result in JSON format (that is, 'j' format):
BEGIN CSN: 2014 first_lsn: 0/2816A28
{"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","1","'hello'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
{"table_name":"public.t1","op_type":"UPDATE","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","5","'hello'"],"old_keys_name":["a"],"old_keys_type":["integer"],"old_keys_val":["1"]}
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
{"table_name":"public.t1","op_type":"DELETE","columns_name":[],"columns_type":[],"columns_val":[],"old_keys_name":["a"],"old_keys_type":["integer"],"old_keys_val":["1"]}
COMMIT XID: 15506