逻辑解码支持指定位点解码
逻辑解码的串/并行解码支持对在线WAL日志的指定位点解码;指定位点为LSN,逻辑解码会从指定LSN往后找到一个一致性点(CONSISTENCY LSN),然后从一致性点开始解码并输出数据。
规格约束
- 只支持对在线WAL日志的指定位点解码。用户需要根据不同业务每天产生的WAL日志数量,来调整GaussDB保留在线WAL日志文件数量的配置参数,以达到满足指定位点WAL日志需求的目的。
- 逻辑解码真正的开始点是一致性点,一致性点具体位置和当时的并发执行事务实际情况相关,例如长事务等。一致性点及之后开启的事务所产生的数据修改才会被解码出来,用户需要确保要解码的事务开始于一致性点之后,为了防止漏解,建议在选择指定的逻辑解码位点时,尽量将其前移一段距离。
- 安装带指定位点功能的版本时,初始化时会自动执行字典表的基线化,安装完成后管理员无需执行基线化函数即可使用指定位点功能;升级场景,从不支持指定位点版本升级到指定位点版本,需要管理员执行基线化系统函数后才能使用指定位点功能,且指定位点必须大于基线化完成时的lsn。
- 如果用户设置的数据字典保留时长小于指定区间的WAL日志产生时长,会造成由于字典表数据缺失而导致解码异常的情况。逻辑解码数据字典的保留时长通过GUC参数(logical_replication_dictionary_retention_time)来设置。
数据字典相关系统表元组满足如下回收条件才可被回收:
- 当前时间减去系统表元组数据的创建时间得到的值大于logical_replication_dictionary_retention_time。
- 系统表数据的csnmax不为0,且csnmax小于逻辑复制槽推进后的最小csn(即如下所示的slot_dictionary_type为dictionary table的元组中最小的dictionary_csn_min)。
gaussdb=# select * from pg_get_replication_slots(); slot_name | plugin | slot_type | datoid | active | xmin | catalog_xmin | restart_lsn | dummy_standby | confirmed_flush | confirmed_csn | dictionary_csn_min | slot_dictionary_type -----------+----------------+-----------+--------+--------+------+--------------+-------------+---------------+-----------------+---------------+--------------------+---------------------- slot_lsn | mppdb_decoding | logical | 41313 | f | | | 0/E34CDC0 | f | 0/E34CE40 | | 3011 | dictionary table (1 row)
接口设计
- 新增控制参数
- 新增逻辑解码控制参数,用于指定解码开始点。可通过JDBC接口或者逻辑复制函数(例如:pg_logical_slot_peek_changes、pg_logical_slot_get_changes、pg_logical_slot_peek_binary_changes、pg_logical_slot_get_binary_changes)开启。
- restart-lsn:未使用该可选参数时,表示逻辑解码开始点使用逻辑复制槽原来的一致性点;有值时,表示逻辑解码开始点是restart-lsn后的一个一致性点。
- 新增GUC参数
- logical_replication_dictionary_retention_time:默认值为365天,用于控制数据字典相关系统表回收时,数据保存时间。
- enable_logical_replication_dictionary:默认为ON,ON状态下,逻辑复制支持创建多版本字典表类型的逻辑复制槽;OFF状态下,不支持创建多版本字典表类型的逻辑复制槽。
- 新增逻辑解码控制参数,用于指定解码开始点。可通过JDBC接口或者逻辑复制函数(例如:pg_logical_slot_peek_changes、pg_logical_slot_get_changes、pg_logical_slot_peek_binary_changes、pg_logical_slot_get_binary_changes)开启。
- 新增系统函数
- gs_logical_dictionary_baseline():执行逻辑解码数据字典的存量数据基线化,执行成功返回耗时,执行失败返回失败原因。
规格说明:函数的执行时长与实例上的业务表数量正相关。实例存在1万张业务表的场景下,基线化函数执行耗时25秒左右;10万张业务表的场景下,基线化函数执行耗时120秒左右。函数执行期间不会阻塞其他SQL语句的操作。
可以通过SELECT status FROM gs_logical_dictionary;确认实例是否已经完成基线化,返回值及含义:
- 0:基线化完成状态未加载。
- 1:基线化未完成。
- 2:基线化进行中。
- 3:基线化已完成。
函数执行成功,查询结果预期是3,表示基线化已完成。并开启对数据字典相关系统表的同步写入,例如:pg_class更改时会同步更改gs_logical_class。
- gs_logical_dictionary_disabled():关闭逻辑解码数据字典功能,停止对数据字典相关系统表的同步写入,执行成功返回OK,执行失败返回失败原因。
关闭数据字典功能之后,无法使用已有的数据字典模式复制槽继续解码,如需使用数据字典功能,须执行gs_logical_dictionary_baseline()函数,对逻辑数据字典重新进行基线化。
- gs_logical_dictionary_baseline():执行逻辑解码数据字典的存量数据基线化,执行成功返回耗时,执行失败返回失败原因。
使用步骤
- 逻辑解码特性需提前设置GUC参数wal_level为logical,该参数需要重启生效。
gs_guc set -Z datanode -D $node_dir -c "wal_level = logical"
其中,$node_dir为数据库节点路径,用户可根据实际情况替换。
- 以具有REPLICATION权限的用户登录GaussDB数据库主节点,使用如下命令连接数据库。
gsql -U user1 -W password -d db1 -p 16000 -r
其中,user1为用户名,password为密码,db1为需要连接的数据库名称,16000为数据库端口号,用户可根据实际情况替换。
- 创建名称为slot1的逻辑复制槽。
1 2 3 4 5
gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot1', 'mppdb_decoding'); slotname | xlog_position ----------+--------------- slot1 | 0/3764C788 (1 row)
- 查询指定时间点的create_lsn。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
gaussdb=# SELECT * FROM gs_txn_lsn_time ORDER BY create_time LIMIT 10; create_csn | create_lsn | snp_xmin | create_time | snp_snapshot | barrier ------------+------------+----------+-------------------------------+--------------+--------- 15639 | 607906688 | 0 | 2024-05-11 11:13:23.909348+08 | | 15640 | 607908632 | 0 | 2024-05-11 11:14:23.9082+08 | | 15641 | 607912608 | 0 | 2024-05-11 11:15:23.907633+08 | | 15642 | 607914280 | 0 | 2024-05-11 11:16:23.907281+08 | | 15643 | 607917416 | 0 | 2024-05-11 11:17:23.906145+08 | | 15646 | 608049488 | 0 | 2024-05-11 11:18:23.905562+08 | | 15647 | 608062840 | 0 | 2024-05-11 11:19:23.904358+08 | | 15648 | 608077904 | 0 | 2024-05-11 11:20:23.903672+08 | | 15651 | 608106328 | 0 | 2024-05-11 11:21:23.902544+08 | | 15653 | 608173464 | 0 | 2024-05-11 11:22:23.902104+08 | | (10 rows)
- create_lsn在该系统表中是以数字形式存储的,该LSN可以通过SQL语法转换为常见的LSN格式(仅供参考),即XXXXXXXX/XXXXXXXX,转换语法如下所示:
SELECT CONCAT(to_hex(({lsn_num}::bigint >> 32) & x'ffffffff'::bigint), '/', to_hex(({lsn_num}::bigint << 32 >> 32) & x'ffffffff'::bigint));
使用上述SQL语句时需将lsn_num转为自己需要的数字,以607906688为例如下所示:
gaussdb=# SELECT CONCAT(to_hex((607906688::bigint >> 32) & x'ffffffff'::bigint), '/', to_hex((607906688::bigint << 32 >> 32) & x'ffffffff'::bigint)); concat ------------ 0/243beb80 (1 row)
- 由于M兼容语法与高斯数据库语法有区别,M兼容库需要使用以下转换语句:
SELECT CONCAT(hex(({lsn_num} >> 32) & 4294967295), '/', hex((({lsn_num}<< 32) >> 32) & 4294967295));
使用上述SQL语句时需将lsn_num转为自己需要的数字,以607906688为例如下所示:
gaussdb_m=# SELECT CONCAT(hex((607906688 >> 32) & 4294967295), '/', hex(((607906688 << 32) >> 32) & 4294967295)); concat ------------ 0/243BEB80 (1 row)
- create_lsn在该系统表中是以数字形式存储的,该LSN可以通过SQL语法转换为常见的LSN格式(仅供参考),即XXXXXXXX/XXXXXXXX,转换语法如下所示:
- 指定解码位点restart-lsn,读取复制槽slot1解码结果。可通过JDBC接口或者逻辑复制函数进行指定位点解码,例如选取步骤4中查询到create_lsn字段值为607906688,根据步骤4提供的转换命令,转换成16进制格式0/243beb80,JDBC接口或逻辑复制函数中设置参数restart-lsn为0/243beb80。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
gaussdb=# SELECT * FROM pg_logical_slot_get_changes('slot_1', NULL, NULL, 'restart-lsn', '0/243beb80'); location | xid | data ------------+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 0/243CBA90 | 98702 | BEGIN 98702 0/243E1538 | 98702 | COMMIT 98702 (at 2024-05-11 11:18:24.824773+08) CSN 15645 0/243E1950 | 98703 | BEGIN 98703 0/243E1C70 | 98703 | COMMIT 98703 (at 2024-05-11 11:19:21.08274+08) CSN 15646 0/243E4D78 | 98704 | BEGIN 98704 0/243E5098 | 98704 | COMMIT 98704 (at 2024-05-11 11:20:21.083968+08) CSN 15647 0/243E8850 | 98705 | BEGIN 98705 0/243E8B70 | 98705 | COMMIT 98705 (at 2024-05-11 11:21:21.084388+08) CSN 15648 0/243E8B70 | 98706 | BEGIN 98706 0/243E8BB0 | 98706 | {"table_name":"public.tt","op_type":"INSERT","columns_name":["c1"],"columns_type":["integer"],"columns_val":["1"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/243E8CD0 | 98706 | COMMIT 98706 (at 2024-05-11 11:21:23.894197+08) CSN 15649 0/243E8D80 | 98707 | BEGIN 98707 0/243E8DC0 | 98707 | {"table_name":"public.tt","op_type":"INSERT","columns_name":["c1"],"columns_type":["integer"],"columns_val":["2"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/243E8EE0 | 98707 | COMMIT 98707 (at 2024-05-11 11:21:27.243517+08) CSN 15650 0/243EF758 | 98708 | BEGIN 98708 0/243EFA78 | 98708 | COMMIT 98708 (at 2024-05-11 11:22:21.08472+08) CSN 15651 0/243FFD98 | 98710 | BEGIN 98710 0/244000D0 | 98710 | COMMIT 98710 (at 2024-05-11 11:23:21.085542+08) CSN 15653 --More--
- 删除逻辑复制槽slot1。
1 2 3 4 5
gaussdb=# SELECT * FROM pg_drop_replication_slot('slot1'); pg_drop_replication_slot -------------------------- (1 row)