逻辑复制函数
使用逻辑复制函数时,需要设置GUC参数wal_level为logical。具体配置请参考《特性指南》的“逻辑解码 > 使用SQL函数接口进行逻辑解码”章节。
- pg_create_logical_replication_slot('slot_name', 'plugin_name', 'output_order')
参数说明:
- slot_name
取值范围:字符串,仅支持小写字母、数字以及“_”,“?”,“-”,“.”字符,且不支持“.”或“..”单独作为复制槽名称。
- plugin_name
取值范围:字符串,当前支持mppdb_decoding。
- output_order
取值范围:0或1,默认值为0。
- 0:设为0时,复制槽解码结果按照事务的COMMIT LSN排序。此时复制槽的confirmed_csn为0,此复制槽称为LSN序复制槽。
- 1:设为1时,复制槽解码结果按照事务的CSN排序。此时复制槽的confirmed_csn为非0值,此复制槽称为CSN序复制槽。
返回值类型:name, text。
示例:gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot_lsn','mppdb_decoding',0); slotname | xlog_position ----------+--------------- slot_lsn | 0/6D08B58 (1 row) gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot_csn','mppdb_decoding',1); slotname | xlog_position ----------+--------------- slot_csn | 0/59AD800 (1 row)
备注:第一个返回值表示slot_name,第二个返回值在LSN序复制槽和CSN序复制槽下有不同含义。对于LSN序复制槽,该值为复制槽的confirmed_flush,表示COMMIT LSN小于等于该值的事务之后不会被解码输出;对于CSN序复制槽,该值为复制槽的confirmed_csn,表示CSN小于等于该值的事务之后不会被解码输出。调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。此函数目前只支持在主机调用。
- slot_name
- pg_create_physical_replication_slot('slot_name', 'isDummyStandby')
参数说明:
返回值类型:name, text
- 调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。
- 由于该函数创建的物理复制槽没有restart_lsn,会被认为是无效槽,在做checkpoint时会被自动删除。
- pg_drop_replication_slot('slot_name')
参数说明:
返回值类型:void
备注:调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。此函数目前只支持在主机调用。
- pg_logical_slot_peek_changes('slot_name', 'upto_lsn', upto_nchanges, 'options_name', 'options_value')
描述:解码并不推进流复制槽(下次解码可以再次获取本次解出的数据)。
参数说明:- slot_name
取值范围:字符串,仅支持小写字母、数字以及“_”,“?”,“-”,“.”字符,且不支持“.”或“..”单独作为复制槽名称。
- upto_lsn
在CSN序逻辑复制槽上代表日志的CSN,表示解码直到小于等于此CSN的事务日志解码完毕(可能会解码一个CSN大于指定CSN的事务);在LSN序复制槽上代表日志的LSN,表示解码直到COMMIT LSN大于等于该LSN的第一个事务解码完毕。
取值范围:字符串(代表十六进制格式表示的uint64,在正中间用'/'分割,左右两边各为一个uint32,如某个uint32为0则显示0),如'1/2AAFC60'、'0/A060'或'3A/0'。为NULL时表示不对解码截止的日志位置做限制。
- upto_nchanges
解码条数(包含begin和commit)。假设一共有三条事务,分别包含3、5、7条记录,如果upto_nchanges为4,那么会解码出前两个事务共8条记录。解码完第二条事务时发现解码条数记录大于等于upto_nchanges,会停止解码。
取值范围:非负整数。
upto_lsn和upto_nchanges中任一参数达到限制,解码都会结束。
- options:此项为可选参数,由一系列options_name和options_value一一对应组成。
- include-xids
取值范围:Boolean,默认值为true。
- false:设为false时,解码出的data列不包含xid信息。
- true:设为true时,解码出的data列包含xid信息。
- skip-empty-xacts
取值范围:Boolean,默认值为false。
- false:设为false时,解码时不忽略空事务信息。
- true:设为true时,解码时会忽略空事务信息。
- include-timestamp
取值范围:Boolean,默认值为true。
- false:设为false时,解码信息不包含commit时间戳。
- true:设为true时,解码信息包含commit时间戳。
- only-local
取值范围:Boolean,默认值为true。
- false:设为false时,解码非本地日志和本地日志。
- true:设为true时,仅解码本地日志。
- force-binary
取值范围:Boolean型,默认值为false。
- false:设为false时,以文本格式输出解码结果。
- true:暂不支持设置。
- white-table-list
取值范围:包含白名单中表名的字符串,不同的表以','为分隔符进行隔离;使用'*'来模糊匹配所有情况;schema名和表名间以'.'分割,不允许存在任意空白符。例:
SELECT * FROM pg_logical_slot_peek_changes('slot1', NULL, 4096, 'white-table-list', 'public.t1,public.t2');
- max-txn-in-memory
内存管控参数,单位为MB,单个事务占用内存大于该值即进行落盘。
取值范围:0~100的整型,默认值为0,即不开启此种管控。
- max-reorderbuffer-in-memory
内存管控参数,单位为GB,拼接-发送线程中正在拼接的事务总内存(包含缓存)大于该值则对当前解码事务进行落盘。
取值范围:0~100的整型,默认值为0,即不开启此种管控。
- include-user
取值范围:bool型,默认值为false。
- false:设为false时,事务的BEGIN逻辑日志不输出事务的用户名字。
- true:设为true时,事务的BEGIN逻辑日志输出事务的用户名字。
- exclude-userids
取值范围:指定黑名单用户的OID,多个OID通过','分隔,不校验用户OID是否存在。
- exclude-users
取值范围:指定黑名单用户的名字,多个名字通过','分隔;通过dynamic-resolution设置是否动态解析识别用户名字。若解码报错用户不存在而中断,在确定日志产生时刻不存在对应的黑名单用户,可以通过配置dynamic-resolution成true或者从用户黑名单中删除报错用户名字来启动解码继续获取逻辑日志。
- dynamic-resolution
取值范围:bool型,默认值为true。
- false:设为false时,当解码观测到黑名单exclude-users中用户不存在时将会报错并退出逻辑解码。
- true:设为true时,当解码观测到黑名单exclude-users中用户不存在时继续解码。
- enable-ddl-decoding
取值范围:bool型,默认值为false。
- false:设为false时,不开启DDL语句的逻辑解码。
- true:设为true时,开启DDL语句的逻辑解码。
- enable-ddl-json-format
取值范围:bool型,默认值为false。
- false:设为false时,传送TEXT格式的DDL反解析结果。
- true:设为true时,传送JSON格式的DDL反解析结果。
- include-xids
其他配置选项可参考《特性指南》中“逻辑复制 > 逻辑解码 > 逻辑解码选项”章节。
返回值类型:text、xid、text
示例:gaussdb=# SELECT * FROM pg_logical_slot_peek_changes('slot_lsn',NULL,4096,'skip-empty-xacts','on'); location | xid | data -----------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 0/6D0B500 | 46914 | BEGIN 46914 0/6D0B530 | 46914 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","1"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/6D0B8B8 | 46914 | COMMIT 46914 (at 2023-02-22 17:29:31.090018+08) CSN 94034528 0/6D0BB58 | 46915 | BEGIN 46915 0/6D0BB88 | 46915 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","2"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/6D0BF08 | 46915 | COMMIT 46915 (at 2023-02-22 17:31:30.672093+08) CSN 94034568 0/6D0BF08 | 46916 | BEGIN 46916 0/6D0BF38 | 46916 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","3"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/6D0C218 | 46916 | COMMIT 46916 (at 2023-02-22 17:31:34.438319+08) CSN 94034570 (9 rows) gaussdb=# SELECT * FROM pg_logical_slot_peek_changes('slot_csn',NULL,4096,'skip-empty-xacts','on'); location | xid | data -----------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 0/0 | 46914 | BEGIN CSN: 94034528 0/0 | 46914 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","1"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/59ADA60 | 46914 | COMMIT 46914 (at 2023-02-22 17:29:31.090018+08) CSN 94034528 0/59ADA60 | 46915 | BEGIN CSN: 94034568 0/59ADA60 | 46915 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","2"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/59ADA88 | 46915 | COMMIT 46915 (at 2023-02-22 17:31:30.672093+08) CSN 94034568 0/59ADA88 | 46916 | BEGIN CSN: 94034570 0/59ADA88 | 46916 | {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b"],"columns_type":["integer","integer"],"columns_val":["3","3"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} 0/59ADA8A | 46916 | COMMIT 46916 (at 2023-02-22 17:31:34.438319+08) CSN 94034570 (9 rows)
备注:函数返回解码结果,每一条解码结果包含三列,对应上述返回值类型,分别表示LSN(LSN序复制槽)或CSN(CSN序复制槽)位置、xid和解码内容。其中location列代表CSN时,仅当解码到COMMIT日志时才会更新。
调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。
- slot_name
- pg_logical_slot_get_changes('slot_name', 'upto_lsn', upto_nchanges, 'options_name', 'options_value')
参数说明:与pg_logical_slot_peek_changes一致,详细内容请参见pg_logical_slot_peek_changes。
备注:调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。此函数支持在主机或备机调用,在备机调用会同步推进主机上对应的逻辑复制槽。
在备机上执行该函数,推进主机对应复制槽时需占用主机的一个walsender。由于逻辑解码功能会为每个逻辑复制槽预留walsender,因此正常场景执行该函数会正常推进主机的逻辑复制槽,如果短时间连续执行该函数会导致通知主机推进失败,且没有报错。
- pg_logical_slot_peek_binary_changes('slot_name', 'upto_lsn', upto_nchanges, 'options_name', 'options_value')
描述:以二进制格式解码且不推进流复制槽(下次解码可以再次获取本次解出的数据)。
参数说明:- slot_name
取值范围:字符串,仅支持小写字母、数字以及“_”,“?”,“-”,“.”字符,且不支持“.”或“..”单独作为复制槽名称。
- upto_lsn
在CSN序逻辑复制槽上代表日志的CSN,表示解码直到小于等于此CSN的事务日志解码完毕(可能会解码一个CSN大于指定CSN的事务);在LSN序复制槽上代表日志的LSN,表示解码直到COMMIT LSN大于等于该LSN的第一个事务解码完毕。
取值范围:字符串(代表十六进制格式表示的uint64,在正中间用'/'分割,左右两边各为一个uint32,如某个uint32为0则显示0),如'1/2AAFC60'、'0/A060'或'3A/0'。为NULL时表示不对解码截止的日志位置做限制。
- upto_nchanges
解码条数(包含begin和commit)。假设一共有三条事务,分别包含3、5、7条记录,如果upto_nchanges为4,那么会解码出前两个事务共8条记录。解码完第二条事务时发现解码条数记录大于等于upto_nchanges,会停止解码。
取值范围:非负整数。
upto_lsn和upto_nchanges中任一参数达到限制,解码都会结束。
- options:此项为可选参数,由一系列options_name和options_value一一对应组成。
- include-xids
取值范围:Boolean,默认值为true。
- false:设为false时,解码出的data列不包含xid信息。
- true:设为true时,解码出的data列包含xid信息。
- skip-empty-xacts
取值范围:Boolean,默认值为false。
- false:设为false时,解码时不忽略空事务信息。
- true:设为true时,解码时会忽略空事务信息。
- include-timestamp
取值范围:Boolean,默认值为true。
- false:设为false时,解码信息不包含commit时间戳。
- true:设为true时,解码信息包含commit时间戳。
- only-local
取值范围:Boolean,默认值为true。
- false:设为false时,解码非本地日志和本地日志。
- true:设为true时,仅解码本地日志。
- force-binary
取值范围:Boolean,均以二进制格式输出结果。
- white-table-list
取值范围:包含白名单中表名的字符串,不同的表以','为分隔符进行隔离;使用'*'来模糊匹配所有情况;schema名和表名间以'.'分割,不允许存在任意空白符。例:select * from pg_logical_slot_peek_binary_changes('slot1', NULL, 4096, 'white-table-list', 'public.t1,public.t2');
- max-txn-in-memory
内存管控参数,单位为MB,单个事务占用内存大于该值即进行落盘。
取值范围:0~100的整型,默认值为0,即不开启此种管控。
- max-reorderbuffer-in-memory
内存管控参数,单位为GB,拼接-发送线程中正在拼接的事务总内存(包含缓存)大于该值则对当前解码事务进行落盘。
取值范围:0~100的整型,默认值为0,即不开启此种管控。
- include-user
取值范围:Boolean,默认值为false。
- false:设为false时,事务的BEGIN逻辑日志不输出事务的用户名字。
- true:设为true时,事务的BEGIN逻辑日志输出事务的用户名字。
- exclude-userids
取值范围:指定黑名单用户的OID,多个OID通过','分隔,不校验用户OID是否存在。
- exclude-users
取值范围:指定黑名单用户的名字,多个名字通过','分隔;通过dynamic-resolution设置是否动态解析识别用户名字。若解码报错用户不存在而中断,在确定日志产生时刻不存在对应的黑名单用户,可以通过配置dynamic-resolution成true或者从用户黑名单中删除报错用户名字来启动解码继续获取逻辑日志。
- dynamic-resolution
取值范围:Boolean,默认值为true。
- false:设为false时,当解码观测到黑名单exclude-users中用户不存在时将会报错并退出逻辑解码。
- true:设为true时,当解码观测到黑名单exclude-users中用户不存在时继续解码。
- include-xids
某些配置选项在函数中仅能配置而不实际生效,可参考《特性指南》中“逻辑复制 > 逻辑解码 > 逻辑解码选项”章节。
返回值类型:text、xid、bytea
备注:函数返回解码结果,每一条解码结果包含三列,对应上述返回值类型,分别表示LSN位置、xid和二进制格式的解码内容。调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。
- slot_name
- pg_logical_slot_get_binary_changes('slot_name', 'upto_lsn', upto_nchanges, 'options_name', 'options_value')
参数说明:与pg_logical_slot_peek_binary_changes一致,详细内容请参见•pg_logical_slot_peek_bi...。
备注:调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。此函数目前只支持在主机上调用。
- pg_replication_slot_advance ('slot_name', 'upto_lsn')
描述:直接推进流复制槽到指定upto_lsn,不输出解码结果。
参数说明:
- slot_name
取值范围:字符串,仅支持小写字母、数字以及“_”,“?”,“-”,“.”字符,且不支持“.”或“..”单独作为复制槽名称。
- upto_lsn
在CSN序逻辑复制槽上代表推进到的日志CSN位置,下次解码时只会输出比该CSN大的事务结果。如果输入的CSN比当前流复制槽记录的confirmed_csn还要小,则直接返回;如果输入的CSN比当前可获取的最新CSN要大或并未输入,则推进到当前可获取的最新CSN。
在LSN序复制槽上代表推进到的日志LSN位置,下次解码时只会输出提交位置比该LSN大的事务结果。如果输入的LSN比当前流复制槽记录的推进位置还要小,则报错;如果输入的LSN比当前最新物理日志LSN还要大或并未输入,则推进到当前最新物理日志LSN。
取值范围:字符串(代表十六进制格式表示的uint64,在正中间用'/'分割,左右两边各为一个uint32,如某个uint32为0则显示0),如'1/2AAFC60'、'0/A060'或'3A/0'。为NULL时表示不对解码截止的日志位置做限制。
返回值类型:name, text
备注:返回值分别对应slot_name和实际推进到的LSN或CSN。调用该函数的用户需要具有SYSADMIN权限或具有REPLICATION权限或继承了内置角色gs_role_replication的权限。此函数支持在主机调用,或在备机对逻辑复制槽调用。在备机调用会同步推进主机上对应的逻辑复制槽。
该函数支持在备机上对逻辑复制槽执行,同步推进主机上对应的逻辑复制槽。在备机上执行该函数,推进主机对应复制槽时需占用主机的一个walsender。由于逻辑解码功能会为每个逻辑复制槽预留walsender,因此正常场景执行该函数会正常推进主机的逻辑复制槽,如果短时间连续执行该函数会导致通知主机推进失败,且没有报错。
- slot_name
- pg_logical_get_area_changes('LSN_start', 'LSN_end', upto_nchanges, 'decoding_plugin', 'xlog_path', 'options_name', 'options_value')
描述:没有ddl的前提下,指定lsn区间进行解码,或者指定xlog文件进行解码。
约束条件如下:
- 当前网络和硬件环境正常。
- 单条元组大小建议不超过500MB,500MB~1GB之间会报错。
- 不支持数据页复制这种不落xlog的数据找回。
- 调用接口时,要求日志级别wal_level=logical,且只有在wal_level=logical期间产生的日志文件才能被解析,如果使用的xlog文件为非logical级别,则解码内容没有对应的值和类型,无其他影响。如果wal_level未被设置成logical级别,则报错不解码。
- xlog文件只能被完全同构的dn的某个副本解析,且数据库发生没有DDL操作和VACUUM FULL,以确保可以找到数据对应的元信息。
- 需要注意一次不要读入过多xlog文件,指定范围解码不指定文件解码的时候推荐一次一个xlog文件的大小,一般情况下,一个xlog文件解码过程中粗估占用内存为xlog文件大小的2~3倍。
- 不支持VACUUM FULL之前的数据找回。
- 不能解码扩容前的xlog文件。
- 对于update语句解码需要表有主键,否则会导致update语句where里数据为空。
- 不支持toast类型、clob类型和blob类型的字段解码,解码时遇到会跳过或者报错。
- 此解码方式为根据xlog文本记录数据进行解码,将可解码内容解码出来,不基于事务进行解码,因此不在此xlog里的数据无法解码。
- 从解码点开始,如果未指定解码文件,会先检测从解码开始点到最新redo值之间是否发生ddl,如果发生ddl则全部不解码;如果指定解码文件,会同时检测解码文件的开始点到文件最后可读内容之间,以及数据目录下xlog开始点到最新redo值之间是否发生ddl,检测到一条ddl则对所有表都不解码。
- 不支持CSN序复制槽。
备注:打开三权分立时,只有数据库初始用户可以调用;关闭三权分立时,需要具备系统管理员权限。
参数说明:
- LSN_start
取值范围:字符串(LSN,格式为xlogid/xrecoff),如'1/2AAFC60'。为NULL时表示不对解码起始的日志位置做限制。
- LSN_end
取值范围:字符串(LSN,格式为xlogid/xrecoff),如'1/2AAFC60'。为NULL时表示不对解码截止的日志位置做限制。
- upto_nchanges
解码条数(包含begin和commit)。假设一共有三条事务,分别包含3、5、7条记录,如果upto_nchanges为4,那么会解码出前两个事务共8条记录。解码完第二条事务时发现解码条数记录大于等于upto_nchanges,会停止解码。
取值范围:非负整数。
LSN和upto_nchanges中任一参数达到限制,解码都会结束。
- decoding_plugin
解码插件,指定解码内容输出格式的so插件。
取值范围:提供mppdb_decoding和sql_decoding两个解码插件。
- xlog_path
解码插件,指定解码文件的xlog绝对路径,文件级别
取值范围:NULL或者xlog文件绝对路径的字符串。
- options:此项为可选参数,由一系列options_name和options_value一一对应组成,可以缺省。
- include-xids
取值范围:bool型,默认值为true。
false:设为false时,解码出的data列不包含xid信息。
true:设为true时,解码出的data列包含xid信息。
- skip-empty-xacts
取值范围:bool型,默认值为false。
false:设为false时,解码时不忽略空事务信息。
true:设为true时,解码时会忽略空事务信息。
- include-timestamp
取值范围:bool型,默认值为true。
false:设为false时,解码信息不包含commit时间戳。
true:设为true时,解码信息包含commit时间戳。
- include-xids
示例:
gaussdb=# CREATE TABLE t1(a int, b int); CREATE TABLE gaussdb=# SELECT pg_current_xlog_location(); pg_current_xlog_location -------------------------- 0/5ECBCD48 (1 row) gaussdb=# INSERT INTO t1 VALUES(1,1); INSERT 0 1 gaussdb=# UPDATE t1 SET b = 2 WHERE a = 1; UPDATE 1 gaussdb=# DELETE FROM t1; DELETE 1 gaussdb=# SELECT * FROM pg_logical_get_area_changes('0/5ECBCD48', NULL, NULL, 'sql_decoding', NULL); location | xid | data ------------+-------+---------------------------------------------------------------------------------- 0/5ECBCD78 | 70718 | insert into public.t1 values (1, 1); 0/5ECBCEA0 | 70718 | COMMIT 70718 (at 2023-11-01 10:58:51.448885+08) 39319 0/5ECBCED0 | 70719 | delete from public.t1 where a = 1 and b = 1;insert into public.t1 values (1, 2); 0/5ECBD028 | 70719 | COMMIT 70719 (at 2023-11-01 10:58:56.487796+08) 39320 0/5ECBD210 | 70720 | delete from public.t1 where a = 1 and b = 2; 0/5ECBD338 | 70720 | COMMIT 70720 (at 2023-11-01 10:58:58.856661+08) 39321 (6 rows) --针对带有生成列的表的场景: gaussdb=# CREATE TABLE t2(a int, b int GENERATED ALWAYS AS (a + 1) STORED); CREATE TABLE gaussdb=# SELECT pg_current_xlog_location(); pg_current_xlog_location -------------------------- 0/5F62CFE8 (1 row) gaussdb=# INSERT INTO t2(a) VALUES(1); INSERT 0 1 gaussdb=# UPDATE t2 set a = 2 where a = 1; UPDATE 1 gaussdb=# DELETE FROM t2; DELETE 1 gaussdb=# SELECT * FROM pg_logical_get_area_changes('0/5F62CFE8', NULL, NULL, 'sql_decoding', NULL, 'skip-generated-columns', 'on'); location | xid | data ------------+-------+--------------------------------------------------------------------- 0/5F62D0C8 | 71293 | insert into public.t2 values (1); 0/5F62D1F0 | 71293 | COMMIT 71293 (at 2023-11-01 11:11:49.452044+08) 39516 0/5F62D220 | 71294 | delete from public.t2 where a = 1;insert into public.t2 values (2); 0/5F62D378 | 71294 | COMMIT 71294 (at 2023-11-01 11:11:54.327701+08) 39517 0/5F62D408 | 71295 | delete from public.t2 where a = 2; 0/5F62D530 | 71295 | COMMIT 71295 (at 2023-11-01 11:11:58.362057+08) 39518 (6 rows)
- pg_get_replication_slots()
返回值类型:text、text、text、oid、boolean、xid、xid、text、boolean、text、xid
示例:gaussdb=# SELECT * FROM pg_get_replication_slots(); slot_name | plugin | slot_type | datoid | active | xmin | catalog_xmin | restart_lsn | dummy_standby | confirmed_flush | confirmed_csn -----------+----------------+-----------+--------+--------+------+--------------+-------------+---------------+-----------------+--------------- dn_6002 | | physical | 0 | t | | | 0/3622B528 | f | | dn_6003 | | physical | 0 | t | | | 0/3622B528 | f | | slot_lsn | mppdb_decoding | logical | 131072 | f | | 66658 | 0/36252350 | f | 0/362523D0 | slot_test | mppdb_decoding | logical | 131072 | f | | 66658 | 0/36251718 | f | | 10025527 (4 rows)
备注:返回值的slot_name代表复制槽名,plugin代表逻辑复制槽对应的输出插件名称,slot_type代表复制槽的类型(physical代表物理复制槽,logical代表逻辑复制槽),datoid代表复制槽所在的数据库OID,active代表复制槽是否为激活状态(f代表未激活,t代表已激活),xmin代表数据库须为复制槽保留的最早事务的事务号,catalog_xmin代表数据库须为逻辑复制槽保留的最早的涉及系统表的事务的事务号,restart_lsn表示复制槽需要的最早xlog的物理位置,dummy_standby是预留参数,confirmed_flush代表客户端确认接收到的日志位置(逻辑复制槽专用),confirmed_csn代表客户端确认接收到的日志中最后一个事务对应的CSN(逻辑复制槽专用)。
在DN上执行查询,LSN序逻辑复制槽的confirmed_csn查询结果为空,CSN序逻辑复制槽的confirmed_flush查询结果为空。
- gs_get_parallel_decode_status()
描述:监控各个解码线程的读取日志队列和解码结果队列的长度,以便定位并行解码性能瓶颈。
返回值类型:text、int、text、text、text、int64、int64、TimestampTz
示例:
gaussdb=# select * from gs_get_parallel_decode_status(); slot_name | parallel_decode_num | read_change_queue_length | decode_change_queue_length | reader_lsn | working_txn_cnt | working_txn_memory | decoded_time -----------+---------------------+---------------------------+----------------------------+------------+-----------------+--------------------+------------------------ slot1 | 2 | queue0: 1005, queue1: 320 | queue0: 63, queue1: 748 | 0/1DCE2578 | 42 | 192927504 | 2023-01-10 11:18:22+08 (1 row)
备注:返回值的slot_name代表复制槽名,parallel_decode_num代表该复制槽的并行解码线程数,read_change_queue_length列出了每个解码线程读取日志队列的当前长度,decode_change_queue_length列出了每个解码线程解码结果队列的当前长度,reader_lsn表示当前reader线程读取的日志位置,working_txn_cnt表示当前拼接-发送线程中正在拼接的事务个数,working_txn_memory代表拼接-发送线程中拼接事务占用总内存(单位字节),decoded_time代表该复制槽最新解码到的WAL日志时间。
decoded_time时间来自检查点日志和事务提交日志,存在一定误差。如果没有解码到任何前述包含时间的日志,则显示“2000-01-01 08:00:00+08”(依照数据库设置的时区而定)。
- gs_get_slot_decoded_wal_time(slot_name)
参数说明:
示例:
gaussdb=# SELECT * FROM gs_get_slot_decoded_wal_time('replication_slot'); gs_get_slot_decoded_wal_time ------------------------------ 2023-01-10 11:25:22+08 (1 row)
备注:返回一列值代表该复制槽最新解码到的WAL日志时间。
返回的时间来自检查点日志和事务提交日志,存在一定误差。如果没有解码到任何前述包含时间的日志,则显示“2000-01-01 08:00:00+08”(依照数据库设置的时区而定)。查询一个当前不存在的逻辑复制槽的最新解码的WAL日志时间时,返回NULL,在gsql中NULL显示和设置有关,可以使用\pset null 'null'设置。
- gs_logical_parallel_decode_status('slot_name')
描述:获取并行逻辑解码某一复制槽的解码统计信息,包含26行指标。
指标定义如下:
Record - (stat_id int, stat_name TEXT, value TEXT)
表1 指标含义 指标名称
说明
slot_name
逻辑解码任务复制槽名称。
reader_lsn
逻辑解码日志位置。
wal_read_total_time
日志模块加载耗时。
wal_wait_total_time
日志模块等待耗时。
parser_total_time
reader线程处理耗时。
decoder_total_time
各decoder线程处理耗时总值。
sender_total_time
sender线程处理耗时。
net_send_total_time
网络发送逻辑日志耗时。
net_wait_total_time
网络等待发送逻辑日志耗时。
net_send_total_bytes
网络发送逻辑日志字节数。
transaction_count
事务数量。
big_transaction_count
大事务数量。
max_transaction_tuples
事务操作元组的最大数量。
sent_transaction_count
发送事务数量(本库)。
spill_disk_transaction_count
落盘事务数量。
spill_disk_bytes
累计落盘字节数量,单位:byte。
spill_disk_count
落盘次数。
input_queue_full_count
各decode线程输入队列FULL次数总数。
output_queue_full_count
各decode线程输出队列FULL次数总数。
dml_count
各decode线程解码WAL日志中DML数量(本库)总数。
dml_filtered_count
各decode线程解码过滤WAL日志中DML数量(本库)总数。
toast_count
涉及TOAST表修改行数。
candidate_catalog_xmin
当前逻辑复制槽catalog_xmin候选点。
candidate_xmin_lsn
推进catalog_xmin所需要的日志确认接收点。
candidate_restart_valid
推进restart_lsn所需要的日志确认接收点。
candidate_restart_lsn
当前逻辑复制槽restart_lsn候选点。
参数说明:
返回值类型:int、text、text
示例:
gaussdb=# SELECT * FROM gs_logical_parallel_decode_status('replication_slot'); stat_id | stat_name | value ---------+------------------------------+------------------ 1 | slot_name | replication_slot 2 | reader_lsn | 0/357E180 3 | wal_read_total_time | 266694599 4 | wal_wait_total_time | 266691307 5 | parser_total_time | 39971 6 | decoder_total_time | 81216 7 | sender_total_time | 48193 8 | net_send_total_time | 19388 9 | net_wait_total_time | 0 10 | net_send_total_bytes | 266897 11 | transaction_count | 7 12 | big_transaction_count | 1 13 | max_transaction_tuples | 4096 14 | sent_transaction_count | 7 15 | spill_disk_transaction_count | 1 16 | spill_disk_bytes | 244653 17 | spill_disk_count | 4096 18 | input_queue_full_count | 0 19 | output_queue_full_count | 0 20 | dml_count | 4097 21 | dml_filtered_count | 0 22 | toast_count | 0 23 | candidate_catalog_xmin | 17152 24 | candidate_xmin_lsn | 0/420A598 25 | candidate_restart_valid | 0/420A598 26 | candidate_restart_lsn | 0/420A598 (26 rows)
备注:按照指标定义,指标应该满足下列约束关系:
wal_read_total_time >= wal_wait_total_time;
transaction_count >= big_transaction_count;
transaction_count >= sent_transaction_count;
transaction_count >= spill_disk_transaction_count;
dml_count >= dml_filtered_count;
dml_count >= toast_count;
如果spill_transaction_count == 0,那么 spill_disk_bytes == 0;
但由于严格保证需要频繁加锁解锁,将对性能造成较大影响,故上面约束关系在多线程情况下可能不满足。
transaction_count统计的是所有库的事务数量。
sent_transaction_count统计的是本库的发送事务数量,因为非本库事务将不会被发送。
当传入不存在的slot_name时,函数不报错,返回值为空。
- gs_logical_parallel_decode_reset_status('slot_name')
描述:重置gs_logical_parallel_decode_status('slot_name')中的指标。
参数说明:
返回值类型:text
示例:
gaussdb=# SELECT * FROM gs_logical_parallel_decode_reset_status('replication_slot'); gs_logical_parallel_decode_reset_status ----------------------------------------- OK (1 row) gaussdb=# SELECT * FROM gs_logical_parallel_decode_status('replication_slot'); stat_id | stat_name | value ---------+------------------------------+------------------ 1 | slot_name | replication_slot 2 | reader_lsn | 0/357E420 3 | wal_read_total_time | 0 4 | wal_wait_total_time | 0 5 | parser_total_time | 0 6 | decoder_total_time | 0 7 | sender_total_time | 0 8 | net_send_total_time | 0 9 | net_wait_total_time | 0 10 | net_send_total_bytes | 0 11 | transaction_count | 0 12 | big_transaction_count | 0 13 | max_transaction_tuples | 0 14 | sent_transaction_count | 0 15 | spill_disk_transaction_count | 0 16 | spill_disk_bytes | 0 17 | spill_disk_count | 0 18 | input_queue_full_count | 0 19 | output_queue_full_count | 0 20 | dml_count | 0 21 | dml_filtered_count | 0 22 | toast_count | 0 23 | candidate_catalog_xmin | 0 24 | candidate_xmin_lsn | 0/0 25 | candidate_restart_valid | 0/420A598 26 | candidate_restart_lsn | 0/420A598 (26 rows)
备注:传入不存在的slot_name时,函数不报错,返回值为invalid slot name。
不允许对正在进行observe的复制槽进行reset操作,异常信息按优先级如下:- slot_name为空,报错,显示:ERROR: inputString should not be NULL。
- slot_name不为空,但不存在,不报错,显示:invalid slot name。
- slot_name不为空,但当前slot_name对应的复制槽正在被采样(observing),不报错,显示:can't reset during observing! use gs_logical_decode_stop_observe to stop。
- gs_logical_decode_start_observe('slot_name', window, interval)
参数说明:
- slot_name
取值范围:字符串,仅支持小写字母、数字以及“_”,“?”,“-”,“.”字符,且不支持“.”或“..”单独作为复制槽名称。
- window
取值范围:整数类型,最小2, 最大1024,收集最近interval*window时间内采样数据。
-
性能监控的间隔,时间间隔类型,秒级。
取值范围:时间间隔类型,最小1s,最大1min,收集最近interval*window时间内采样数据。
返回值类型:text
示例:
gaussdb=# SELECT * FROM gs_logical_decode_start_observe('replication_slot',20,5); gs_logical_decode_start_observe --------------------------------- OK (1 row) gaussdb=# select * from gs_logical_decode_start_observe('replication_slot',20,5); gs_logical_decode_start_observe --------------------------------- observe has started! (1 row)
备注:传入不存在的slot_name时,函数不报错,返回值为invalid slot name。
不允许对已经开启observe的复制槽进行开启observe操作,异常信息按优先级如下:- slot_name为空,报错,显示:ERROR: inputString should not be NULL。
- slot_name不为空,但不存在,不报错,显示:invalid slot name。
- 若window小于2,则显示:window has to be >= 2。
- 若window大于1024,则显示:window has to be <= 1024。
- 若interval小于1S,则显示:sample interval has to be >= 1s。
- 若interval大于60S,则显示:sample interval has to be <= 60s。
- slot_name不为空,但当前slot_name对应的复制槽已开启observe,不报错,显示:observe has started!
- slot_name
- gs_logical_decode_stop_observe('slot_name')
参数说明:
示例:
gaussdb=# SELECT * FROM gs_logical_decode_stop_observe('replication_slot'); gs_logical_decode_stop_observe -------------------------------- OK (1 row) gaussdb=# SELECT * FROM gs_logical_decode_stop_observe('replication_slot'); gs_logical_decode_stop_observe -------------------------------- observe not started! (1 row)
备注:传入不存在的slot_name时,函数不报错,返回值为invalid slot name。
不允许对已经停止observe的复制槽进行停止observe操作,异常信息按优先级如下:- slot_name为空,报错,显示:ERROR: inputString should not be NULL。
- slot_name不为空,但不存在,不报错,显示:invalid slot name。
- slot_name不为空,但当前slot_name对应的复制槽已停止observe,不报错,显示:observe not started!
- gs_logical_decode_observe_data('slot_name')
参数说明:
示例:
gaussdb=# SELECT * FROM gs_logical_decode_observe_data('replication_slot'); slot_name | sample_time | reader_lsn | wal_read_total_time | wal_wait_total_time | parser_total_time | decoder_total_time | sender_total_time | net_send_total_time | net_send_total_bytes | transaction_count | big_ transaction_count | sent_transaction_count | spill_transaction_count | spill_disk_bytes -----------+-------------------------------+------------+---------------------+---------------------+-------------------+--------------------+-------------------+---------------------+----------------------+-------------------+----- ------------------+------------------------+-------------------------+------------------ repl | 2023-01-12 20:09:40.416798+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:09:45.416849+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:09:50.417006+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:09:55.417057+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:00.417115+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:05.417165+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:10.417217+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:15.417271+08 | 49447976 | 776846657 | 776846244 | 65 | {46,11,11,6} | 56 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:20.417342+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:25.417433+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:30.417526+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:35.417532+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:40.417644+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 repl | 2023-01-12 20:10:45.417763+08 | 49448264 | 836085882 | 836085442 | 67 | {46,11,11,8} | 58 | 0 | 0 | 0 | 0 | 0 | 0 | 0 (14 rows
备注:传入不存在的slot_name时,函数不报错,返回值为空记录。
- gs_logical_decode_observe('slot_name')
参数说明:
示例:
gaussdb=# SELECT * FROM gs_logical_decode_observe('replication_slot'); slot_name | sample_time | logical_decode_rate | wal_read_rate | parser_rate | decoder_rate | sender_rate | net_send_rate ------------------+-------------------------------+---------------------+---------------+--------------+----------------+---------------+--------------- replication_slot | 2023-01-12 20:16:50.42448+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:16:55.424537+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:17:00.424641+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:17:05.424645+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:17:10.424795+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:17:15.424848+08 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 replication_slot | 2023-01-12 20:17:20.424849+08 | 57.600 | 699029.126 | 96000000.000 | 1152000000.000 | 144000000.000 | 0.000 replication_slot | 2023-01-12 20:17:25.424959+08 | 0.000 | 699029.126 | 96000000.000 | 1152000000.000 | 144000000.000 | 0.000 replication_slot | 2023-01-12 20:17:30.42496+08 | 0.000 | 699029.126 | 96000000.000 | 1152000000.000 | 144000000.000 | 0.000 replication_slot | 2023-01-12 20:17:35.425059+08 | 0.000 | 699029.126 | 96000000.000 | 1152000000.000 | 144000000.000 | 0.000
备注:传入不存在的slot_name时,函数不报错,返回值为空记录。若分母为0,则返回最近一次有效数据。若分母不为0,分子为0,则返回0。
指标计算公式:
logical_decode_rate = (reader_lsn1 - reader_lsn2) / (sample_time1 - sample_time2)
wal_read_rate = (reader_lsn1 - reader_lsn2) / (wal_read_total_time1 - wal_read_total_time2) - (wal_wait_total_time1 - wal_wait_total_time2)
parser_rate = (reader_lsn1 - reader_lsn2) / (parser_total_time1 - parser_total_time2)
decoder_rate= (reader_lsn1 - reader_lsn2) / avg(decoder_total_time1[i]- decoder_total_time2[i])
sender_rate = (reader_lsn1 - reader_lsn2) / (sender_total_time1 - sender_total_time2) - (net_send_total_time1 - net_send_total_time2)
sender_rate = (net_sent_bytes1 - net_sent_bytes2) / (net_send_total_time1 - net_send_total_time2) - (net_wait_total_time1 - net_wait_total_time2)
- gs_logical_decode_observe_status('slot_name')
参数说明:
示例:
gaussdb=# SELECT * FROM gs_logical_decode_observe_status('replication_slot'); gs_logical_decode_observe_status ---------------------------------- START (1 row) gaussdb=# SELECT * FROM gs_logical_decode_observe_status('replication_slot'); gs_logical_decode_observe_status ---------------------------------- invalid slot name (1 row) gaussdb=# SELECT * FROM gs_logical_decode_stop_observe('replication_slot'); gs_logical_decode_stop_observe -------------------------------- OK (1 row) gaussdb=# SELECT * FROM gs_logical_decode_observe_status('replication_slot'); gs_logical_decode_observe_status ---------------------------------- STOP (1 row)
备注:传入不存在的slot_name时,函数不报错,返回值为invalid slot name。
- gs_get_parallel_decode_thread_info()
描述:在并行解码所在的DN执行,返回当前DN上并行解码的线程信息。
返回值类型:int64、text、text、int
示例:
gaussdb=# SELECT * FROM gs_get_parallel_decode_thread_info(); thread_id | slot_name | thread_type | seq_number -----------------+-----------+-------------+------------ 140335364699904 | slot1 | sender | 1 140335214098176 | slot1 | reader | 1 140335325312768 | slot1 | decoder | 1 140335291750144 | slot1 | decoder | 2 140335274968832 | slot1 | decoder | 3 140335258187520 | slot1 | decoder | 4 140335165404928 | slot2 | sender | 1 140335022864128 | slot2 | reader | 1 140335129818880 | slot2 | decoder | 1 140335113037568 | slot2 | decoder | 2 (10 rows)
备注:返回值thread_id代表线程id,slot_name代表复制槽名,thread_type表示线程种类(共三种,sender代表发送线程、reader代表读取线程、decoder代表解码线程),seq_number代表每个线程在当前复制槽中同种线程的序号。其中sender和reader在每个并行解码连接中均只有一个,因此序号均为1,decoder的序号从1排列到当前复制槽解码并行度。
- pg_replication_origin_create (node_name)
描述:用给定的外部名称创建一个复制源,并且返回分配给它的内部ID。
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
返回值类型:oid
- pg_replication_origin_drop (node_name)
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
- pg_replication_origin_oid (node_name)
描述:根据名称查找复制源并返回内部ID。如果没有发现这样的复制源,则抛出错误。
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
返回值类型:oid
- pg_replication_origin_session_setup (node_name)
描述:将当前会话标记为从给定的原点回放,从而允许跟踪回放进度。只能在当前没有选择原点时使用。使用pg_replication_origin_session_reset 命令来撤销。
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
- pg_replication_origin_session_reset ()
描述:取消pg_replication_origin_session_setup()的效果。
备注:调用该函数的用户需要具有SYSADMIN权限。
- pg_replication_origin_session_is_setup ()
备注:调用该函数的用户需要具有SYSADMIN权限。
返回值类型:boolean
- pg_replication_origin_session_progress (flush)
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
返回值类型:LSN
- pg_replication_origin_xact_setup (origin_lsn, origin_timestamp)
描述:将当前事务标记为重放在给定LSN和时间戳上提交的事务。只能在使用pg_replication_origin_session_setup选择复制源时调用。
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
- pg_replication_origin_xact_reset ()
描述:取消pg_replication_origin_xact_setup()的效果。
备注:调用该函数的用户需要具有SYSADMIN权限。
- pg_replication_origin_advance (node_name, lsn)
将给定节点的复制进度设置为给定的位置。这主要用于设置初始位置,或在配置更改或类似的变更后设置新位置。
注意:这个函数的使用不当可能会导致不一致的复制数据。
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
- pg_replication_origin_progress (node_name, flush)
备注:调用该函数的用户需要具有SYSADMIN权限。
参数说明:
- pg_show_replication_origin_status()
备注:调用该函数的用户需要具有SYSADMIN权限。
返回值类型:
- local_id:oid,复制源id。
- external_id:text,复制源名称。
- remote_lsn:LSN,复制源的lsn位置。
- local_lsn:LSN,本地的lsn位置。
- gs_get_distribute_decode_status()
描述:获取当前节点上分布式解码状态细节(以复制槽为粒度),集中式暂不支持,执行返回报错。
返回值类型:text、int、int、bigint、xid、xid、text、text、text
- gs_get_distribute_decode_status_detail()
描述:获取当前节点上分布式解码状态细节(以DN为粒度),集中式暂不支持,执行返回报错。
返回值类型:text、int、bigint、int、int、xid