更新时间:2024-11-12 GMT+08:00
分享

逻辑解码支持指定位点解码

逻辑解码的串/并行解码支持对在线WAL日志的指定位点解码;指定位点为LSN,逻辑解码会从指定LSN往后找到一个一致性点(CONSISTENCY LSN),然后从一致性点开始解码并输出数据。

规格约束

  • 只支持对在线WAL日志的指定位点解码。用户需要根据不同业务每天产生的WAL日志数量,来调整GaussDB保留在线WAL日志文件数量的配置参数,以达到满足指定位点WAL日志需求的目的。
  • 逻辑解码真正的开始点是一致性点,一致性点具体位置和当时的并发执行事务实际情况相关,例如长事务等。一致性点及之后开启的事务所产生的数据修改才会被解码出来,用户需要确保要解码的事务开始于一致性点之后,为了防止漏解,建议在选择指定的逻辑解码位点时,尽量将其前移一段距离。
  • 安装带指定位点功能的版本时,初始化时会自动执行字典表的基线化,安装完成后管理员无需执行基线化函数即可使用指定位点功能;升级场景,从不支持指定位点版本升级到指定位点版本,需要管理员执行基线化系统函数后才能使用指定位点功能,且指定位点必须大于基线化完成时的lsn。
  • 如果用户设置的数据字典保留时长小于指定区间的WAL日志产生时长,会造成由于字典表数据缺失而导致解码异常的情况。逻辑解码数据字典的保留时长通过GUC参数(logical_replication_dictionary_retention_time)来设置。

    数据字典相关系统表元组满足如下回收条件才可被回收:

    1. 当前时间减去系统表元组数据的创建时间得到的值大于logical_replication_dictionary_retention_time。
    2. 系统表数据的csnmax不为0,且csnmax小于逻辑复制槽推进后的最小csn(即如下所示的slot_dictionary_type为dictionary table的元组中最小的dictionary_csn_min)。
      gaussdb=# select * from pg_get_replication_slots();
      slot_name |     plugin     | slot_type | datoid | active | xmin | catalog_xmin | restart_lsn | dummy_standby | confirmed_flush | confirmed_csn | dictionary_csn_min | slot_dictionary_type
      -----------+----------------+-----------+--------+--------+------+--------------+-------------+---------------+-----------------+---------------+--------------------+----------------------
      slot_lsn  | mppdb_decoding | logical   |  41313 | f      |      |              | 0/E34CDC0   | f             | 0/E34CE40       |               |               3011 | dictionary table
      (1 row)

接口设计

  • 新增控制参数
    1. 新增逻辑解码控制参数,用于指定解码开始点。可通过JDBC接口或者逻辑复制函数(例如:pg_logical_slot_peek_changes、pg_logical_slot_get_changes、pg_logical_slot_peek_binary_changes、pg_logical_slot_get_binary_changes)开启。
      • restart-lsn:未使用该可选参数时,表示逻辑解码开始点使用逻辑复制槽原来的一致性点;有值时,表示逻辑解码开始点是restart-lsn后的一个一致性点。
    2. 新增GUC参数
      • logical_replication_dictionary_retention_time:默认值为365天,用于控制数据字典相关系统表回收时,数据保存时间。
      • enable_logical_replication_dictionary:默认为ON,ON状态下,逻辑复制支持创建多版本字典表类型的逻辑复制槽;OFF状态下,不支持创建多版本字典表类型的逻辑复制槽。
  • 新增系统函数
    • gs_logical_dictionary_baseline():执行逻辑解码数据字典的存量数据基线化,执行成功返回耗时,执行失败返回失败原因。

      规格说明:函数的执行时长与实例上的业务表数量正相关。实例存在1万张业务表的场景下,基线化函数执行耗时25秒左右;10万张业务表的场景下,基线化函数执行耗时120秒左右。函数执行期间不会阻塞其他SQL语句的操作。

      可以通过SELECT status FROM gs_logical_dictionary;确认实例是否已经完成基线化,返回值及含义:

      • 0:基线化完成状态未加载。
      • 1:基线化未完成。
      • 2:基线化进行中。
      • 3:基线化已完成。

      函数执行成功,查询结果预期是3,表示基线化已完成。并开启对数据字典相关系统表的同步写入,例如:pg_class更改时会同步更改gs_logical_class。

    • gs_logical_dictionary_disabled():关闭逻辑解码数据字典功能,停止对数据字典相关系统表的同步写入,执行成功返回OK,执行失败返回失败原因。

      关闭数据字典功能之后,无法使用已有的数据字典模式复制槽继续解码,如需使用数据字典功能,须执行gs_logical_dictionary_baseline()函数,对逻辑数据字典重新进行基线化。

使用步骤

  1. 逻辑解码特性需提前设置GUC参数wal_level为logical,该参数需要重启生效。

    gs_guc set -Z datanode -D $node_dir -c "wal_level = logical"

    其中,$node_dir为数据库节点路径,用户可根据实际情况替换。

  2. 以具有REPLICATION权限的用户登录GaussDB数据库主节点,使用如下命令连接数据库。

    gsql -U user1 -W password -d db1 -p 16000 -r

    其中,user1为用户名,password为密码,db1为需要连接的数据库名称,16000为数据库端口号,用户可根据实际情况替换。

  3. 创建名称为slot1的逻辑复制槽。

    1
    2
    3
    4
    5
    gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot1', 'mppdb_decoding');
     slotname | xlog_position 
    ----------+---------------
     slot1    | 0/3764C788
    (1 row)
    

  4. 查询指定时间点的create_lsn。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    gaussdb=# SELECT * FROM gs_txn_lsn_time ORDER BY create_time LIMIT 10;
     create_csn | create_lsn | snp_xmin |          create_time          | snp_snapshot | barrier 
    ------------+------------+----------+-------------------------------+--------------+---------
          15639 |  607906688 |        0 | 2024-05-11 11:13:23.909348+08 |              | 
          15640 |  607908632 |        0 | 2024-05-11 11:14:23.9082+08   |              | 
          15641 |  607912608 |        0 | 2024-05-11 11:15:23.907633+08 |              | 
          15642 |  607914280 |        0 | 2024-05-11 11:16:23.907281+08 |              | 
          15643 |  607917416 |        0 | 2024-05-11 11:17:23.906145+08 |              | 
          15646 |  608049488 |        0 | 2024-05-11 11:18:23.905562+08 |              | 
          15647 |  608062840 |        0 | 2024-05-11 11:19:23.904358+08 |              | 
          15648 |  608077904 |        0 | 2024-05-11 11:20:23.903672+08 |              | 
          15651 |  608106328 |        0 | 2024-05-11 11:21:23.902544+08 |              | 
          15653 |  608173464 |        0 | 2024-05-11 11:22:23.902104+08 |              | 
    (10 rows)
    
    • create_lsn在该系统表中是以数字形式存储的,该LSN可以通过SQL语法转换为常见的LSN格式(仅供参考),即XXXXXXXX/XXXXXXXX,转换语法如下所示:

      SELECT CONCAT(to_hex(({lsn_num}::bigint >> 32) & x'ffffffff'::bigint), '/', to_hex(({lsn_num}::bigint << 32 >> 32) & x'ffffffff'::bigint));

      使用上述SQL语句时需将lsn_num转为自己需要的数字,以607906688为例如下所示:

      gaussdb=# SELECT CONCAT(to_hex((607906688::bigint >> 32) & x'ffffffff'::bigint), '/', to_hex((607906688::bigint << 32 >> 32) & x'ffffffff'::bigint));
      concat
      ------------
      0/243beb80
      (1 row)
    • 由于M兼容语法与高斯数据库语法有区别,M兼容库需要使用以下转换语句:

      SELECT CONCAT(hex(({lsn_num} >> 32) & 4294967295), '/', hex((({lsn_num}<< 32) >> 32) & 4294967295));

      使用上述SQL语句时需将lsn_num转为自己需要的数字,以607906688为例如下所示:

      gaussdb_m=# SELECT CONCAT(hex((607906688 >> 32) & 4294967295), '/', hex(((607906688 << 32) >> 32) & 4294967295));
         concat
      ------------
       0/243BEB80
      (1 row)

  5. 指定解码位点restart-lsn,读取复制槽slot1解码结果。可通过JDBC接口或者逻辑复制函数进行指定位点解码,例如选取步骤4中查询到create_lsn字段值为607906688,根据步骤4提供的转换命令,转换成16进制格式0/243beb80,JDBC接口或逻辑复制函数中设置参数restart-lsn为0/243beb80。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    gaussdb=# SELECT * FROM pg_logical_slot_get_changes('slot_1', NULL, NULL, 'restart-lsn', '0/243beb80');
      location  |  xid   |                                                                                    data
    ------------+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     0/243CBA90 |  98702 | BEGIN 98702
     0/243E1538 |  98702 | COMMIT 98702 (at 2024-05-11 11:18:24.824773+08) CSN 15645
     0/243E1950 |  98703 | BEGIN 98703
     0/243E1C70 |  98703 | COMMIT 98703 (at 2024-05-11 11:19:21.08274+08) CSN 15646
     0/243E4D78 |  98704 | BEGIN 98704
     0/243E5098 |  98704 | COMMIT 98704 (at 2024-05-11 11:20:21.083968+08) CSN 15647
     0/243E8850 |  98705 | BEGIN 98705
     0/243E8B70 |  98705 | COMMIT 98705 (at 2024-05-11 11:21:21.084388+08) CSN 15648
     0/243E8B70 |  98706 | BEGIN 98706
     0/243E8BB0 |  98706 | {"table_name":"public.tt","op_type":"INSERT","columns_name":["c1"],"columns_type":["integer"],"columns_val":["1"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/243E8CD0 |  98706 | COMMIT 98706 (at 2024-05-11 11:21:23.894197+08) CSN 15649
     0/243E8D80 |  98707 | BEGIN 98707
     0/243E8DC0 |  98707 | {"table_name":"public.tt","op_type":"INSERT","columns_name":["c1"],"columns_type":["integer"],"columns_val":["2"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/243E8EE0 |  98707 | COMMIT 98707 (at 2024-05-11 11:21:27.243517+08) CSN 15650
     0/243EF758 |  98708 | BEGIN 98708
     0/243EFA78 |  98708 | COMMIT 98708 (at 2024-05-11 11:22:21.08472+08) CSN 15651
     0/243FFD98 |  98710 | BEGIN 98710
     0/244000D0 |  98710 | COMMIT 98710 (at 2024-05-11 11:23:21.085542+08) CSN 15653
      
    --More--
    

  6. 删除逻辑复制槽slot1。

    1
    2
    3
    4
    5
    gaussdb=# SELECT * FROM pg_drop_replication_slot('slot1');
     pg_drop_replication_slot
    --------------------------
    
    (1 row)
    

相关文档