逻辑解码支持DDL
GaussDB主机上正常执行DDL语句,通过逻辑解码工具可以获取到DDL语句。
表 |
索引 |
自定义函数 |
自定义存储过程 |
触发器 |
Sequence |
Schema |
Comment on |
---|---|---|---|---|---|---|---|
CREATE TABLE [PARTITION | AS | SUBPARTITION ALTER TABLE [PARTITION | SUBPARTITION DROP TABLE RENAME TABLE TRUNCATE |
CREATE INDEX ALTER INDEX DROP INDEX REINDEX |
CREATE FUNCTION ALTER FUNCTION DROP FUNCTION |
CREATE PROCEDURE ALTER PROCEDURE DROP PROCEDURE |
CREATE TRIGGER ALTER TRIGGER DROP TRIGGER |
CREATE SEQUENCE ALTER SEQUENCE DROP SEQUENCE |
CREATE SCHEMA ALTER SCHEMA DROP SCHEMA |
COMMENT ON |
表 |
索引 |
Sequence |
Schema |
Comment on |
---|---|---|---|---|
ALTER TABLE CREATE TABLE DROP TABLE ALTER TABLE PARTITION CREATE TABLE PARTITION TRUNCATE |
ALTER INDEX CREATE INDEX DROP INDEX REINDEX ALTER TABLE DROP INDEX ALTER TABLE ADD INDEX |
ALTER SEQUENCE CREATE SEQUENCE DROP SEQUENCE |
ALTER SCHEMA CREATE SCHEMA DROP SCHEMA ALTER DATABASE CREATE DATABASE DROP DATABASE |
COMMENT ON |
功能描述
数据库在执行DML的时候,存储引擎会生成对应的DML日志,用于进行恢复,对这些DML日志进行解码,即可还原对应的DML语句,生成逻辑日志。而对于DDL语句,数据库并不记录DDL原语句的日志,而是记录DDL语句涉及的系统表的DML日志。DDL种类多样、语法复杂,逻辑复制要支持DDL语句,通过这些系统表的DML日志来解码原DDL语句是非常困难的。新增DDL日志记录原DDL信息,并在解码时通过DDL日志可以得到DDL原语句。
在DDL语句执行过程中,SQL引擎解析器会对原语句进行语法、词法解析,并生成解析树(不同的DDL语法会生成不同类型的解析树,解析树中包含DDL语句的全部信息)。随后,执行器通过这些信息执行对应操作,生成、修改对应元信息。
本节通过新增DDL日志的方式,来支持逻辑解码DDL,其内容由解析器结果(解析树)以及执行器结果生成,并在执行器执行完成后生成该日志。
从语法树反解析出DDL,DDL反解析能够将DDL命令转换为JSON格式的语句,并提供必要的信息在目标位置重建DDL命令。与原始DDL命令字符串相比,使用DDL反解析的好处包括:
- 解析出来的每个数据库对象都带有Schema,因此如果使用不同的search_path,也不会有歧义。
- 结构化的JSON和格式化的输出能支持异构数据库。如果用户使用的是不同的数据库版本,并且存在某些DDL语法差异,需要在应用之前解决这些差异。
反解析输出的结果是规范化后的形式,结果与用户输入等价,不保证完全相同,例如:
示例1:在函数体中没有单引号'时,函数体的分隔符$$会被解析为单引号'。
原始SQL语句:
CREATE FUNCTION func(a INT) RETURNS INT AS $$ BEGIN a:= a+1; CREATE TABLE test(col1 INT); INSERT INTO test VALUES(1); DROP TABLE test; RETURN a; END; $$ LANGUAGE plpgsql;
反解析结果:
CREATE FUNCTION public.func ( IN a pg_catalog.int4 ) RETURNS pg_catalog.int4 LANGUAGE plpgsql VOLATILE CALLED ON NULL INPUT SECURITY INVOKER COST 100 AS ' BEGIN a:= a+1; CREATE TABLE test(col1 INT); INSERT INTO test VALUES(1); DROP TABLE test; RETURN a; END; ';
示例3:“ALTER INDEX "Alter_Index_Index" REBUILD PARTITION "CA_ADDRESS_SK_index2"”会被反解析为“REINDEX INDEX public."Alter_Index_Index" PARTITION "CA_ADDRESS_SK_index2"”。
gaussdb=# CREATE TABLE test_create_table_partition2 (c1 INT, c2 INT)
PARTITION BY RANGE (c2) (
PARTITION p1 START(1) END(1000) EVERY(200) ,
PARTITION p2 END(2000),
PARTITION p3 START(2000) END(2500),
PARTITION p4 START(2500),
PARTITION p5 START(3000) END(5000) EVERY(1000)
);
gaussdb=# CREATE TABLE test_create_table_partition2 (c1 INT, c2 INT)
PARTITION BY RANGE (c2) (
PARTITION p1_0 VALUES LESS THAN ('1'), PARTITION p1_1 VALUES LESS THAN ('201'), PARTITION p1_2 VALUES LESS THAN ('401'), PARTITION p1_3 VALUES LESS THAN ('601'), PARTITION p1_4 VALUES LESS THAN ('801'), PARTITION p1_5 VALUES LESS THAN ('1000'),
PARTITION p2 VALUES LESS THAN ('2000'),
PARTITION p3 VALUES LESS THAN ('2500'),
PARTITION p4 VALUES LESS THAN ('3000'),
PARTITION p5_1 VALUES LESS THAN ('4000'),
PARTITION p5_2 VALUES LESS THAN ('5000')
);
- 原始SQL语句:
gaussdb=# CREATE TABLE IF NOT EXISTS tb5 (c1 int,c2 int) with (ORIENTATION=ROW, STORAGE_TYPE=USTORE); gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN IF NOT EXISTS c2 char(5) after c1; -- 可解码。TABLE中已有int型的列c2,语句执行跳过,反解析结果中c2列的类型仍为原来的类型。
反解析结果:
gaussdb=# ALTER TABLE IF EXISTS public.tb5 ADD COLUMN IF NOT EXISTS c2 pg_catalog.int4 AFTER c1;
- 原始SQL语句:
gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN IF NOT EXISTS c2 char(5) after c1, ADD COLUMN IF NOT EXISTS c3 char(5) after c1; -- 解码。新增列c3的反解析结果类型正确。
反解析结果:
gaussdb=# ALTER TABLE IF EXISTS public.tb5 ADD COLUMN IF NOT EXISTS c2 pg_catalog.int4 AFTER c1, ADD COLUMN IF NOT EXISTS c3 pg_catalog.bpchar(5) AFTER c1;
- 原始SQL语句:
gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN c2 char(5) after c1, ADD COLUMN IF NOT EXISTS c4 int after c1; --不解码,语句执行错误。
- 原始SQL语句:
gaussdb=# CREATE TABLE IF NOT EXISTS tb6 (c1 int,c2 int) with (ORIENTATION=ROW, STORAGE_TYPE=USTORE); gaussdb=# DROP TABLE tb6;
反解析结果:
gaussdb=# DROP TABLE IF EXISTS public.tb6 RESTRICT;
规格约束
- 逻辑解码支持DDL规格:
- 纯DDL逻辑解码性能标准环境下约为100MB/S,DDL/DML混合事务逻辑解码性能标准环境下约为100MB/S。
- 开启此功能后(设置wal_level=logical且enable_logical_replication_ddl=on),对DDL语句影响性能下降小于15%。
- 解码通用约束(串行和并行):
- 不支持解码本地临时对象的DDL操作。
- 不支持FOREIGN TABLE场景的DDL解码。
-
alter table add column的default值不支持stable类型和volatile类型的函数;create table和alter table的column的check表达式不支持stable类型和volatile类型的函数;alter table如果有多条子语句,只要其中一条子语句存在上述两种情况,则该条alter table整条语句不反解析。
gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b1 TIMESTAMP DEFAULT NOW(); -- 's' NOT DEPARSE gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b2 INT DEFAULT RANDOM(); -- 'v' NOT DEPARSE gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b3 INT DEFAULT ABS(1); -- 'i' DEPARSE
- 不支持分布式CREATE MATERIALIZED VIEW的DDL解码。
- 不支持CREATE/ALTER/DROP VIEW、COMMENT ON VIEW的DDL解码。
- 不支持REINDEX DATABASE/SYSTEM的DDL解码。
- 不支持视图上触发器相关的DDL解码。
- 不支持CONCURRENTLY相关语句的DDL解码。
- 创建对象时语句中存在IF NOT EXISTS时,如果对象已存在,则不进行解码。删除对象时语句中存在IF EXISTS时,如果对象不存在,则不进行解码。
- 不对ALTER PACKAGE COMPILE语句进行解码,但会解码实例化内容中包含的DDL/DML语句。如果PACKAGE里没有DDL或DML部分的实例化内容,则alter package compile会被逻辑解码忽略。
- 仅支持本版本之前版本的商用DDL语法,以下SQL语句不支持逻辑解码。
- 创建行存表,设置ILM策略。
gaussdb=# CREATE TABLE IF NOT EXISTS tb3 (c1 int) with (storage_type=USTORE,ORIENTATION=ROW) ILM ADD POLICY ROW STORE COMPRESS ADVANCED ROW AFTER 7 day OF NO MODIFICATION;
反解析结果:
gaussdb=# CREATE TABLE IF NOT EXISTS public.tb3 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
- B兼容模式下创建或修改表时,给表或列添加注释。
gaussdb=# CREATE TABLE IF NOT EXISTS tb6 (c1 integer comment 'Mysql兼容注释语法') with (ORIENTATION=ROW, STORAGE_TYPE=USTORE);
反解析结果:
gaussdb=# CREATE TABLE IF NOT EXISTS public.tb6 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
- 创建行存表,设置ILM策略。
- 逻辑解码不支持DDL(DCL)/DML混合事务,混合事务中DDL之后的DML解码不支持。
-- 均不反解析,DCL为不支持语句故不解析,DML处于DCL之后也不反解析 gaussdb=# BEGIN; gaussdb=# GAINT ALL PRIVILEGES to u01; gaussdb=# INSERT INTO test1(col1) values(1); gaussdb=# COMMIT; -- 只反解析第一句和第三句SQL语句 gaussdb=# BEGIN; gaussdb=# CREATE TABLE mix_tran_t4(id int); gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# CREATE TABLE mix_tran_t5(id int); gaussdb=# COMMIT; -- 只反解析第一句和第二句SQL语句 gaussdb=# BEGIN; gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# CREATE TABLE mix_tran_t6(id int); gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# COMMIT; -- 全反解析 gaussdb=# BEGIN; gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# CREATE TABLE mix_tran_t7(id int); gaussdb=# CREATE TABLE mix_tran_t8(id int); gaussdb=# COMMIT; -- 只反解析第一句和第三句SQL语句 gaussdb=# BEGIN; gaussdb=# CREATE TABLE mix_tran_t7(id int); gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text); gaussdb=# CREATE TABLE mix_tran_t8(id int); gaussdb=# COMMIT; -- 全反解析 gaussdb=# BEGIN; gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# COMMIT; -- 只反解析第一句SQL语句 gaussdb=# BEGIN; gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text); gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# COMMIT; -- 只反解析第一句和第三句SQL语句 gaussdb=# BEGIN; gaussdb=# INSERT INTO mix_tran_t4 VALUES(111); gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text); gaussdb=# CREATE TABLE mix_tran_t9(id int); gaussdb=# COMMIT;
- 逻辑解码语句CREATE TABLE AS SELECT、SELECT INTO和CREATE TABLE AS仅能解码出CREATE TABLE语句,暂不支持解码INSERT语句。
对于CTAS创建的表,仍会解码其ALTER和DROP语句。
示例:
原始SQL语句:
CREATE TABLE IF NOT EXISTS tb35_2 (c1 int) with (storage_type=USTORE,ORIENTATION=ROW); INSERT INTO tb35_2 VALUES (6); CREATE TABLE tb35_1 with (storage_type=USTORE,ORIENTATION=ROW) AS SELECT * FROM tb35_2;
最后一句SQL语句反解析结果:
CREATE TABLE public.tb35_1 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
- 执行存储过程/函数/高级包时,若其本身包含DDL/DML混合事务或者其本身与同事务内其他语句组成DDL/DML混合事务,则按照混合事务原则执行解码。
- 逻辑解码不支持账本数据库功能,创建账本数据库的DDL语句解码结果中会包含hash列。
- 原始语句:
CREATE SCHEMA blockchain_schema WITH BLOCKCHAIN; CREATE TABLE blockchain_schema.blockchain_table(mes int);
- 解码结果:
CREATE SCHEMA blockchain_schema WITH BLOCKCHAIN; CREATE TABLE blockchain_schema.blockchain_table (mes pg_catalog.int4, hash_a1d895 pg_catalog.hash16); -- 此语句无法在目标端回放。
需要在目标端手动关闭blockchain_schema的防篡改属性后,才可以正常回放,此时目标端的blockchain_table等同于一张普通表,再次执行DML命令可以正常回放。
SQL命令:
ALTER SCHEMA blockchain_schema WITHOUT BLOCKCHAIN; CREATE TABLE blockchain_schema.blockchain_table (mes pg_catalog.int4, hash_a1d895 pg_catalog.hash16);
- 原始语句:
- 串行逻辑解码支持DDL特有约束:
- sql_decoding插件不支持json格式的DDL。
解码格式
- JSON格式
对于输入的DDL语句,SQL引擎解析器会通过语法、词法分析将其分解为解析树,解析树节点中包含了DDL的全部信息,并且执行器会根据解析树内容,执行系统元信息的修改。在执行器执行完成之后,便可以获取到DDL操作数据对象的search_path。本特性在执行器执行成功之后,对解析树信息以及执行器结果进行反解析,以还原出DDL原语句的全部信息。反解析的方式可以分解整个DDL语句,以方便输出JSON格式的DDL,用以适配异构数据库场景。
CREATE TABLE语句在经过词法、语法分析之后,得到对应的CreateStmt解析树节点,节点中包含了表信息、列信息、分布式信息(DistributeBy结构体)、分区信息(PartitionState结构)等。通过反解析后,可输出的JSON格式如下:
{"JDDL":{"fmt":"CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D %{table_elements}s %{with_clause}s %{compression}s","identity":{"object_name":"test_create_table_a","schema_name":"public"},"compression":"NOCOMPRESS","persistence":"","with_clause":{"fmt":"WITH (%{with:, }s)","with":[{"fmt":"%{label}s = %{value}L","label":{"fmt":"%{label}I","label":"orientation"},"value":"row"},{"fmt":"%{label}s = %{value}L","label":{"fmt":"%{label}I","label":"compression"},"value":"no"}]},"if_not_exists":"","table_elements":{"fmt":"(%{elements:, }s)","elements":[{"fmt":"%{name}I %{column_type}T","name":"a","column_type":{"typmod":"","typarray":false,"type_name":"int4","schema_name":"pg_catalog"}}]}}}
可以看到,JSON格式中包含对象的search_path,其中的identity键标识schema为public,表名为test_create_table_a,其中%{persistence}s对应的字段如下,此SQL语句不含此字段所以为空。
[ [ GLOBAL | LOCAL ] [ TEMPORARY | TEMP ] | UNLOGGED ]
%{if_not_exists}s对应SQL语句中的字段,不含此字段所以为空:
[ IF NOT EXISTS ]
%{identity}D对应SQL语句中的字段:
table_name
%{table_elements}s对应SQL语句中的字段:
(column_name data_type)
%{with_clause}s对应SQL语句中的字段:
[ WITH ( {storage_parameter = value} [, ... ] ) ]
%{compression}s对应SQL语句中的字段:
[ COMPRESS | NOCOMPRESS ]
- decode-style指定格式
输出的格式由decode-style参数控制,如当decode-style='j'时,输出格式如下:
{"TDDL":"CREATE TABLE public.test_create_table_a (a pg_catalog.int4) WITH (orientation = 'row', compression = 'no') NOCOMPRESS"}
其中语句中也包含Schema名称。
接口设计
- 新增控制参数
- 新增逻辑解码控制参数,用于控制DDL的反解析流程以及输出形式。可通过JDBC接口或者pg_logical_slot_peek_changes开启。
- enable-ddl-decoding:默认false,不开启DDL语句的逻辑解码;值为true时,开启DDL语句的逻辑解码。
- enable-ddl-json-format:默认false,传送TEXT格式的DDL反解析结果;值为true时,传送JSON格式的DDL反解析结果。
- 新增GUC参数
- 新增逻辑解码控制参数,用于控制DDL的反解析流程以及输出形式。可通过JDBC接口或者pg_logical_slot_peek_changes开启。
- 新增日志
新增DDL日志xl_logical_ddl_message,其类型为RM_LOGICALDDLMSG_ID。其定义如下:
名称
类型
意义
db_id
OID
数据库ID
rel_id
OID
表ID
csn
CommitSeqNo
CSN快照
cid
CommandId
Command ID
tag_type
NodeTag
DDL类型
message_size
Size
日志内容长度
filter_message_size
Size
日志中白名单过滤信息长度
message
char *
DDL内容
使用步骤
- 逻辑解码特性需提前设置GUC参数wal_level为logical,该参数需要重启生效。
gs_guc set -Z datanode -D $node_dir -c "wal_level = logical"
其中,$node_dir为数据库节点路径,用户可根据实际情况替换。
- 以具有REPLICATION权限的用户登录GaussDB数据库主节点,使用如下命令连接数据库。
gsql -U user1 -W password -d db1 -p 16000 -r
其中,user1为用户名,password为密码,db1为需要连接的数据库名称,16000为数据库端口号,用户可根据实际情况替换。
- 创建名称为slot1的逻辑复制槽。
1 2 3 4 5
gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot1', 'mppdb_decoding'); slotname | xlog_position ----------+--------------- slot1 | 0/3764C788 (1 row)
- 在数据库中创建Package。
1 2 3 4 5 6
gaussdb=# CREATE OR REPLACE PACKAGE ldp_pkg1 IS var1 int:=1; --公有变量 var2 int:=2; PROCEDURE testpro1(var3 int); --公有存储过程,可以被外部调用 END ldp_pkg1; /
- 读取复制槽slot1解码结果,可通过JDBC接口或者pg_logical_slot_peek_changes推进复制槽。
- 逻辑解码选项请参见逻辑解码选项和新增控制参数。
- 并行解码中,在JDBC接口中改变参数decode_style可以决定解码格式:
通过配置选项decode-style,指定解码格式。其取值为char型的字符'j'、't'或'b',分别代表json格式、text格式及二进制格式。
1 2 3 4 5 6 7
gaussdb=# SELECT data FROM pg_logical_slot_peek_changes('slot1', NULL, NULL, 'enable-ddl-decoding', 'true', 'enable-ddl-json-format', 'false') WHERE data not like 'BEGIN%' AND data not like 'COMMIT%' AND data not like '%dbe_pldeveloper.gs_source%'; data -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- {"TDDL":"CREATE OR REPLACE PACKAGE public.ldp_pkg1 AUTHID CURRENT_USER IS var1 int:=1; --公有变量\n var2 int:=2;\n PROCEDURE testpro1(var3 int); --公有存储过程,可以被外部调用\nEND ldp_pkg1; \n /"} (1 row)
- 删除逻辑复制槽slot1,删除package ldp_pkg1。
1 2 3 4 5 6 7 8 9
gaussdb=# SELECT * FROM pg_drop_replication_slot('slot1'); pg_drop_replication_slot -------------------------- (1 row) gaussdb=# DROP PACKAGE ldp_pkg1; NOTICE: drop cascades to function public.testpro1(integer) DROP PACKAGE