更新时间:2024-11-12 GMT+08:00
分享

逻辑复制

下面示例演示如何通过JDBC接口使用逻辑复制功能的过程。

针对逻辑复制的配置选项,除了参考《特性指南》的“逻辑复制 > 逻辑解码”章节中的配置选项外,还有专门给JDBC等流式解码工具增加的配置项,如下所示:

  1. 解码线程并行度

    通过配置选项parallel-decode-num,指定并行解码的Decoder线程数量。其取值范围为int型的1~20,取1表示按照原有的串行逻辑进行解码,取其余值即为开启并行解码。默认值为1。当该选项配置为1时,禁止配置以下选项:解码格式选项decode-style、批量发送选项sending-batch和并行解码队列长度选项parallel-queue-size。

  2. 解码格式

    通过配置选项decode-style,指定解码格式。其取值为char型的字符'j'、't'或'b',分别代表json格式、text格式及二进制格式。该选项仅允许并行解码时设置,且二进制格式解码仅在并行解码场景下支持。对于json格式和text格式解码,在批量发送的解码结果中,每条解码语句的前4字节组成的uint32代表该条语句总字节数(不包含该uint32类型占用的4字节,0代表本批次解码结束),8字节uint64代表相应lsn(begin对应first_lsn,commit对应end_lsn,其他场景对应该条语句的lsn)。

    二进制格式编码规则如下所示:

    1. 前4字节代表接下来到语句级别分隔符字母P(不含)或者该批次结束符F(不含)的解码结果的总字节数,该值如果为0代表本批次解码结束。
    2. 接下来8字节uint64代表相应lsn(begin对应first_lsn,commit对应end_lsn,其他场景对应该条语句的lsn)。
    3. 接下来1字节的字母有5种B/C/I/U/D,分别代表begin/commit/insert/update/delete。
    4. 当第3步中的1字节的字母为B时:
      1. 接下来的8字节uint64代表CSN。
      2. 再接下来的8字节uint64代表first_lsn。
      3. 【该部分为可选项】接下来的1字节字母如果为T,则代表后面4字节uint32表示该事务commit时间戳长度,再后面等同于该长度的字符为时间戳字符串。
      4. 【该部分为可选项】接下来的1字节字母如果为N,则代表后面4字节uint32表示该事务用户名的长度,再后面等同于该长度的字符为事务的用户名字。
      5. 因为之后仍可能有解码语句,接下来会有1字节字母P或F作为语句间的分隔符,P代表本批次仍有解码的语句,F代表本批次解码完成。
    5. 当第3步中的1字节的字母为C时:
      1. 【该部分为可选项】接下来1字节字母如果为X,则代表后面的8字节uint64表示xid。
      2. 【该部分为可选项】接下来1字节字母如果为T,则代表后面的4字节uint32表示时间戳长度,再后面等同于该长度的字符为时间戳字符串。
      3. 因为批量发送日志时,一个COMMIT日志解码之后可能仍有其他事务的解码结果,接下来的1字节字母如果为P则表示该批次仍需解码,如果为F则表示该批次解码结束。
    6. 当第3步中的1字节的字母为I/U/D时:
      1. 接下来的2字节uint16代表schema名的长度。
      2. 按照上述长度读取schema名。
      3. 接下来的2字节uint16代表table名的长度。
      4. 按照上述长度读取table名。
      5. 【该部分为可选项】接下来1字节字母如果为N代表为新元组,如果为O代表为旧元组,这里先发送新元组。
        1. 接下来的2字节uint16代表该元组需要解码的列数,记为attrnum。
        2. 以下流程重复attrnum次。
          1. 接下来2字节uint16代表列名的长度。
          2. 按照上述长度读取列名。
          3. 接下来4字节uint32代表当前列类型的OID。
          4. 接下来4字节uint32代表当前列的值(以字符串格式存储)的长度,如果为0xFFFFFFFF则表示NULL,如果为0则表示长度为0的字符串。
          5. 按照上述长度读取列值。
      6. 因为之后仍可能有解码语句,接下来的1字节字母如果为P则表示该批次仍需解码,如果为F则表示该批次解码结束。
  1. 限制仅备机解码

    通过配置选项standby-connection,指定是否限制仅备机解码。其取值为Boolean型(可用0或1表示),取true(或1)代表限制仅允许连接备机解码,连接主机解码时会报错退出。取false(或0)时不做限制。默认值为false(0)。

  2. 批量发送

    通过配置选项sending-batch,指定是否批量发送。其取值范围为int型的0或1,取0表示逐条发送解码结果,取1表示解码结果累积到达1MB则批量发送解码结果。默认值为0。该选项仅允许并行解码时设置。开启批量发送的场景中,当解码格式为'j'或't'时,在原来的每条解码语句之前会附加一个uint32类型,表示本条解码结果长度(长度不包含当前的uint32类型),以及一个uint64类型,表示当前解码结果对应的lsn。

  3. 并行解码队列长度

    通过配置选项parallel-queue-size,指定并行逻辑解码线程间进行交互的队列长度。取值范围[2,1024],且必须为2的幂数,默认值为128。队列长度和解码过程的内存使用量正相关。

  4. 逻辑解码内存阈值

    逻辑解码内存阈值通过配置选项max-txn-in-memory指定单个事务解码中间结果缓存的内存阈值,单位为MB。并行解码模式下,该参数已废弃,不生效。串行解码模式下,取值范围:[0,100],默认值为0,表示不管控内存使用。

    通过配置选项max-reorderbuffer-in-memory指定所有事务解码中间结果缓存的内存阈值;单位为GB。串行解码-范围:[0,100],默认值为0,表示不管控内存使用。并行解码-取值范围:[1,max_process_memory总量的50%],默认值为1与max_process_memory/1048576*10%的较大值,其中1048576为kB到GB的单位转换。当超过内存阈值时,解码过程将出现解码中间结果写临时文件的现象,影响逻辑解码的性能。

  5. 逻辑解码表元信息内存阈值

    逻辑解码表元信息通过配置选项desc-memory-limit指定逻辑解码任务维护的所有表元信息内存开销的上限;单位为MB;默认值为100,取值范围为[10, 1024]。解码过程中,维护的表元信息总内存超过该阈值时,会触发表元信息的FIFO内存淘汰机制,清理部分内存。

  6. 逻辑解码发送超时阈值

    通过配置选项sender-timeout指定内核与客户端的心跳超时阈值。当该时间段内没有收到客户端任何消息,逻辑解码将主动停止,并断开和客户端的连接。单位为毫秒(ms),取值范围[0, 2147483647],默认值取决于GUC参数logical_sender_timeout配置。设置为0,表示逻辑解码不会主动断开和客户端的连接;如果设置过小,例如1ms,则可能存在解码任务中断的风险。

  7. 逻辑解码用户黑名单选项
    使用逻辑解码用户黑名单,逻辑解码结果将过滤黑名单中用户的事务操作。当前相关选项如下:
    1. exclude-userids:指定黑名单用户的OID,多个OID通过逗号分隔,不校验用户OID是否存在。
    2. exclude-users:指定黑名单用户名字,多个名字通过逗号分隔,通过dynamic-resolution设置是否动态解析识别用户名字。若解码报错用户不存在而出现中断、在确定日志产生时刻不存在对应的黑名单用户,可以通过配置dynamic-resolution成true或者从用户黑名单中删除报错用户名字来启动解码继续获取逻辑日志。
    3. dynamic-resolution:是否动态解析黑名单用户名字,默认为true。设置为false时,当解码观测到黑名单exclude-users中用户不存在时将报错并退出逻辑解码。设置为true时,当解码观测到黑名单exclude-users中用户不存在时继续解码。
  8. 事务逻辑日志输出选项
    1. include-xids:事务的BEGIN逻辑日志是否输出事务ID,默认为true。
    2. include-timestamp:事务的BEGIN逻辑日志是否输出事务提交时间,默认为false。
    3. include-user:事务的BEGIN逻辑日志是否输出事务的用户名字,默认为false。事务的用户名字特指授权用户——执行事务对应会话的登录用户,它在事务的整个执行过程中不会发生变化。
  9. JDBC默认设置逻辑解码连接的socketTimeout=10s,备机解码在主机压力大的时候可能会导致连接超时关闭,可以通过配置withStatusInterval(10000,TimeUnit.MILLISECONDS),调整超时时间解决。
  10. 心跳日志输出选项

    enable-heartbeat:是否输出心跳日志,默认为false。

    若开启心跳日志选项,此处说明心跳日志如何解析:二进制格式首先是字符'h'表示消息是心跳日志,之后是心跳日志内容,分别是8字节uint64代表LSN,表示发送心跳逻辑日志时读取的WAL日志结束位置;8字节uint64代表LSN,表示发送心跳逻辑日志时刻已经落盘的WAL日志的位置;8字节int64代表时间戳(从1970年1月1日开始),表示最新解码到的事务日志或检查点日志的产生时间戳。关于消息结束符:如果是二进制格式则为字符'F',如果格式为text或者json且为批量发送则结束符为'0',否则没有结束符。消息内容采用大端字节序进行数据传输。具体格式见下图:

  11. 逻辑解码控制参数,用于控制DDL的反解析流程以及输出形式:
    1. enable-ddl-decoding:默认false,不开启DDL语句的逻辑解码。值为true时,开启DDL语句的逻辑解码。
    2. enable-ddl-json-format:默认false,传送TEXT格式的DDL反解析结果。值为true时,传送JSON格式的DDL反解析结果。

逻辑复制类PGReplicationStream为非线程安全类,并发调用可能导致数据异常。

代码运行的前提条件:
  1. 根据实际情况添加gaussdbjdbc.jar包(例如用户使用IDE执行代码,则需要在本地IDE添加gaussdbjdbc.jar包)。
  2. 添加JDBC用户机器IP到数据库白名单里,修改gs_hba.conf配置文件的命令如下:
    gs_guc reload -Z datanode -N all -I all -h 'host replication all 0.0.0.0/0 sha256'
  3. 将wal_level参数设置为logical,设置方法请联系管理员处理。
  4. 创建表t1和t2,并且对该表进行DDL或DML操作。
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    // 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放,使用时解密),确保安全。
    // 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)EXAMPLE_USERNAME_ENV和EXAMPLE_PASSWORD_ENV。
    // $ip、$port、database需要用户自行修改。
    
    import com.huawei.gaussdb.jdbc.PGProperty;
    import com.huawei.gaussdb.jdbc.jdbc.PgConnection;
    import com.huawei.gaussdb.jdbc.replication.LogSequenceNumber;
    import com.huawei.gaussdb.jdbc.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.DriverManager;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    public class LogicalReplicationDemo {
        private static PgConnection conn = null;
        public static void main(String[] args) {
            String driver = "com.huawei.gaussdb.jdbc.Driver";
     //此处配置数据库IP以及端口,这里的端口为haPort,通常默认是所连接DN的port+1端口
            String sourceURL = "jdbc:gaussdb://$ip:$port/database";    
     //默认逻辑复制槽的名称是:replication_slot
     //测试模式:创建逻辑复制槽
            int TEST_MODE_CREATE_SLOT = 1;
     //测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在)
            int TEST_MODE_START_REPL = 2;
     //测试模式:删除逻辑复制槽
            int TEST_MODE_DROP_SLOT = 3;
            //开启不同的测试模式
            int testMode = TEST_MODE_START_REPL;
    
            try {
                Class.forName(driver);
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
    
            try {
                Properties properties = new Properties();
                PGProperty.USER.set(properties, System.getenv("EXAMPLE_USERNAME_ENV"));
                PGProperty.PASSWORD.set(properties, System.getenv("EXAMPLE_PASSWORD_ENV"));
         //对于逻辑复制,以下三个属性是必须配置项
                PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
                PGProperty.REPLICATION.set(properties, "database");
                PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
                conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
                System.out.println("connection success!");
    
                if(testMode == TEST_MODE_CREATE_SLOT){
                    conn.getReplicationAPI()
                            .createReplicationSlot()
                            .logical()
                            .withSlotName("replication_slot") //这里字符串如包含大写字母则会自动转化为小写字母
                            .withOutputPlugin("mppdb_decoding")
                            .make();
                }else if(testMode == TEST_MODE_START_REPL) {
                    //开启此模式前需要创建复制槽
                    LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
                    PGReplicationStream stream = conn
                            .getReplicationAPI()
                            .replicationStream()
                            .logical()
                            .withSlotName("replication_slot")
                            .withSlotOption("include-xids", true)
                            .withSlotOption("skip-empty-xacts", true)
                            .withStartPosition(waitLSN)
                            .withSlotOption("parallel-decode-num", 10) //解码线程并行度
                            .withSlotOption("white-table-list", "public.t1,public.t2") //白名单列表
                            // .withSlotOption("standby-connection", true) //强制备机解码。如果连接主节点,该行可以不设置。
                            .withSlotOption("decode-style", "t") //解码格式
                            .withSlotOption("sending-batch", 0) //批量发送解码结果
                            .withSlotOption("max-reorderbuffer-in-memory", 2) //正在处理的解码事务落盘内存阈值为2GB
                            .withSlotOption("exclude-users", "userA") //不返回用户userA执行事务的逻辑日志
                            .withSlotOption("include-user", false) //事务BEGIN逻辑日志不携带用户名字
                            .withSlotOption("enable-heartbeat", true) // 开启心跳日志
                            .start();
                    while (true) {
                        ByteBuffer byteBuffer = stream.readPending();
    
                        if (byteBuffer == null) {
                            TimeUnit.MILLISECONDS.sleep(10L);
                            continue;
                        }
    
                        int offset = byteBuffer.arrayOffset();
                        byte[] source = byteBuffer.array();
                        int length = source.length - offset;
                        System.out.println(new String(source, offset, length));
    
                        //如果需要flush lsn,根据业务实际情况调用以下接口,每调用一次以下接口将触发数据库复制槽落盘逻辑,可能影响解码性能,建议每10s左右调用一次
                        //LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
                        //stream.setFlushedLSN(lastRecv);
                        //stream.forceUpdateStatus();
    
                    }
                }else if(testMode == TEST_MODE_DROP_SLOT){
                    conn.getReplicationAPI()
                            .dropReplicationSlot("replication_slot");
                }
            } catch (Exception e) {
                e.printStackTrace();
                return;
            } finally {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    } 
    // 注意:如上代码不能直接读取二进制格式的逻辑日志,需按二进制格式编码规则读取。
    
text格式(即't'格式)解码结果示例如下:
BEGIN CSN: 2014 first_lsn: 0/2816A28
table public t1 INSERT: a[integer]:1 b[integer]:2 c[text]:'hello'
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
table public t1 UPDATE: old-key: a[integer]:1 b[integer]:2 c[text]:'hello' new-tuple: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
table public t1 DELETE: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15506
json格式(即'j'格式)解码结果示例如下:
BEGIN CSN: 2014 first_lsn: 0/2816A28
{"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","1","'hello'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
{"table_name":"public.t1","op_type":"UPDATE","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","5","'hello'"],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","1","'hello'"]}
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
{"table_name":"public.t1","op_type":"DELETE","columns_name":[],"columns_type":[],"columns_val":[],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","5","'hello'"]}
COMMIT XID: 15506

相关文档