更新时间:2024-11-12 GMT+08:00
分享

逻辑解码支持DDL

GaussDB主机上正常执行DDL语句,通过逻辑解码工具可以获取到DDL语句。

表1 具体支持的DDL类型

索引

自定义函数

自定义存储过程

触发器

Sequence

Schema

Comment on

CREATE TABLE [PARTITION | AS | SUBPARTITION

ALTER TABLE [PARTITION | SUBPARTITION

DROP TABLE

RENAME TABLE

TRUNCATE

CREATE INDEX

ALTER INDEX

DROP INDEX

REINDEX

CREATE FUNCTION

ALTER FUNCTION

DROP FUNCTION

CREATE PROCEDURE

ALTER PROCEDURE

DROP PROCEDURE

CREATE TRIGGER

ALTER TRIGGER

DROP TRIGGER

CREATE SEQUENCE

ALTER SEQUENCE

DROP SEQUENCE

CREATE SCHEMA

ALTER SCHEMA

DROP SCHEMA

COMMENT ON

表2 M-Compatibility模式数据库支持的DDL类型

索引

Sequence

Schema

Comment on

ALTER TABLE

CREATE TABLE

DROP TABLE

ALTER TABLE PARTITION

CREATE TABLE PARTITION

TRUNCATE

ALTER INDEX

CREATE INDEX

DROP INDEX

REINDEX

ALTER TABLE DROP INDEX

ALTER TABLE ADD INDEX

ALTER SEQUENCE

CREATE SEQUENCE

DROP SEQUENCE

ALTER SCHEMA

CREATE SCHEMA

DROP SCHEMA

ALTER DATABASE

CREATE DATABASE

DROP DATABASE

COMMENT ON

功能描述

数据库在执行DML的时候,存储引擎会生成对应的DML日志,用于进行恢复,对这些DML日志进行解码,即可还原对应的DML语句,生成逻辑日志。而对于DDL语句,数据库并不记录DDL原语句的日志,而是记录DDL语句涉及的系统表的DML日志。DDL种类多样、语法复杂,逻辑复制要支持DDL语句,通过这些系统表的DML日志来解码原DDL语句是非常困难的。新增DDL日志记录原DDL信息,并在解码时通过DDL日志可以得到DDL原语句。

在DDL语句执行过程中,SQL引擎解析器会对原语句进行语法、词法解析,并生成解析树(不同的DDL语法会生成不同类型的解析树,解析树中包含DDL语句的全部信息)。随后,执行器通过这些信息执行对应操作,生成、修改对应元信息。

本节通过新增DDL日志的方式,来支持逻辑解码DDL,其内容由解析器结果(解析树)以及执行器结果生成,并在执行器执行完成后生成该日志。

从语法树反解析出DDL,DDL反解析能够将DDL命令转换为JSON格式的语句,并提供必要的信息在目标位置重建DDL命令。与原始DDL命令字符串相比,使用DDL反解析的好处包括:

  1. 解析出来的每个数据库对象都带有Schema,因此如果使用不同的search_path,也不会有歧义。
  2. 结构化的JSON和格式化的输出能支持异构数据库。如果用户使用的是不同的数据库版本,并且存在某些DDL语法差异,需要在应用之前解决这些差异。

反解析输出的结果是规范化后的形式,结果与用户输入等价,不保证完全相同,例如:

示例1:在函数体中没有单引号'时,函数体的分隔符$$会被解析为单引号'。

原始SQL语句:

CREATE FUNCTION func(a INT) RETURNS INT AS
$$
BEGIN
a:= a+1;
CREATE TABLE test(col1 INT);
INSERT INTO test VALUES(1);
DROP TABLE test;
RETURN a;
END;
$$
LANGUAGE plpgsql;

反解析结果:

CREATE  FUNCTION public.func ( IN a pg_catalog.int4 ) RETURNS pg_catalog.int4 LANGUAGE plpgsql VOLATILE CALLED ON NULL INPUT SECURITY INVOKER COST 100 AS '
BEGIN
a:= a+1;
CREATE TABLE test(col1 INT);
INSERT INTO test VALUES(1);
DROP TABLE test;
RETURN a;
END;
';

示例3:“ALTER INDEX "Alter_Index_Index" REBUILD PARTITION "CA_ADDRESS_SK_index2"”会被反解析为“REINDEX INDEX public."Alter_Index_Index" PARTITION "CA_ADDRESS_SK_index2"”。

示例4:创建/修改范围分区表,START END语法格式均解码转化为LESS THAN语句:
gaussdb=# CREATE TABLE test_create_table_partition2 (c1 INT, c2 INT) 
PARTITION BY RANGE (c2) (
    PARTITION p1 START(1) END(1000) EVERY(200) ,
    PARTITION p2 END(2000),
    PARTITION p3 START(2000) END(2500),
    PARTITION p4 START(2500),
    PARTITION p5 START(3000) END(5000) EVERY(1000) 
);
会被反解析为:
gaussdb=# CREATE TABLE test_create_table_partition2 (c1 INT, c2 INT) 
PARTITION BY RANGE (c2) (
    PARTITION p1_0 VALUES LESS THAN ('1'), PARTITION p1_1 VALUES LESS THAN ('201'), PARTITION p1_2 VALUES LESS THAN ('401'), PARTITION p1_3 VALUES LESS THAN ('601'), PARTITION p1_4 VALUES LESS THAN ('801'), PARTITION p1_5 VALUES LESS    THAN ('1000'), 
    PARTITION p2 VALUES LESS THAN ('2000'), 
    PARTITION p3 VALUES LESS THAN ('2500'), 
    PARTITION p4 VALUES LESS THAN ('3000'), 
    PARTITION p5_1 VALUES LESS THAN ('4000'), 
    PARTITION p5_2 VALUES LESS THAN ('5000')
);
示例5:新增表的列字段时,使用IF NOT EXISTS判断。
  • 原始SQL语句:
    gaussdb=# CREATE TABLE IF NOT EXISTS tb5 (c1 int,c2 int) with (ORIENTATION=ROW, STORAGE_TYPE=USTORE);
    gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN IF NOT EXISTS c2 char(5) after c1; -- 可解码。TABLE中已有int型的列c2,语句执行跳过,反解析结果中c2列的类型仍为原来的类型。

    反解析结果:

    gaussdb=# ALTER TABLE IF EXISTS public.tb5 ADD COLUMN IF NOT EXISTS c2 pg_catalog.int4 AFTER c1;
  • 原始SQL语句:
    gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN IF NOT EXISTS c2 char(5) after c1,  ADD COLUMN IF NOT EXISTS c3 char(5) after c1; -- 解码。新增列c3的反解析结果类型正确。

    反解析结果:

    gaussdb=# ALTER TABLE IF EXISTS public.tb5 ADD COLUMN IF NOT EXISTS c2 pg_catalog.int4 AFTER c1, ADD COLUMN IF NOT EXISTS c3 pg_catalog.bpchar(5) AFTER c1;
  • 原始SQL语句:
    gaussdb=# ALTER TABLE IF EXISTS tb5 * ADD COLUMN c2 char(5) after c1,  ADD COLUMN IF NOT EXISTS c4 int after c1; --不解码,语句执行错误。
示例6:定义数据库对象(DROP TABLE/INDEX/SEQUENCE)时,解码结果会添加IFEXISTS语法选项。
  • 原始SQL语句:
    gaussdb=# CREATE TABLE IF NOT EXISTS tb6 (c1 int,c2 int) with (ORIENTATION=ROW, STORAGE_TYPE=USTORE);
    gaussdb=#   DROP TABLE tb6;

    反解析结果:

    gaussdb=# DROP TABLE IF EXISTS public.tb6 RESTRICT;

规格约束

  • 逻辑解码支持DDL规格:
    • 纯DDL逻辑解码性能标准环境下约为100MB/S,DDL/DML混合事务逻辑解码性能标准环境下约为100MB/S。
    • 开启此功能后(设置wal_level=logical且enable_logical_replication_ddl=on),对DDL语句影响性能下降小于15%。
  • 解码通用约束(串行和并行):
    • 不支持解码本地临时对象的DDL操作。
    • 不支持FOREIGN TABLE场景的DDL解码。
    • alter table add column的default值不支持stable类型和volatile类型的函数;create table和alter table的column的check表达式不支持stable类型和volatile类型的函数;alter table如果有多条子语句,只要其中一条子语句存在上述两种情况,则该条alter table整条语句不反解析。

      gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b1 TIMESTAMP DEFAULT NOW(); -- 's' NOT DEPARSE
      gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b2 INT DEFAULT RANDOM(); -- 'v' NOT DEPARSE
      gaussdb=# ALTER TABLE tbl_28 ADD COLUMN b3 INT DEFAULT ABS(1); -- 'i' DEPARSE
    • 不支持分布式CREATE MATERIALIZED VIEW的DDL解码。
    • 不支持CREATE/ALTER/DROP VIEW、COMMENT ON VIEW的DDL解码。
    • 不支持REINDEX DATABASE/SYSTEM的DDL解码。
    • 不支持视图上触发器相关的DDL解码。
    • 不支持CONCURRENTLY相关语句的DDL解码。
    • 创建对象时语句中存在IF NOT EXISTS时,如果对象已存在,则不进行解码。删除对象时语句中存在IF EXISTS时,如果对象不存在,则不进行解码。
    • 不对ALTER PACKAGE COMPILE语句进行解码,但会解码实例化内容中包含的DDL/DML语句。如果PACKAGE里没有DDL或DML部分的实例化内容,则alter package compile会被逻辑解码忽略。
    • 仅支持本版本之前版本的商用DDL语法,以下SQL语句不支持逻辑解码。
      • 创建行存表,设置ILM策略。

        原始SQL语句:

        gaussdb=# CREATE TABLE IF NOT EXISTS tb3 (c1 int) with (storage_type=USTORE,ORIENTATION=ROW) ILM ADD POLICY ROW STORE COMPRESS ADVANCED ROW AFTER 7 day OF NO MODIFICATION;

        反解析结果:

        gaussdb=# CREATE TABLE IF NOT EXISTS public.tb3 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
      • B兼容模式下创建或修改表时,给表或列添加注释。

        原始SQL语句:

        gaussdb=# CREATE TABLE IF NOT EXISTS tb6 (c1 integer comment 'Mysql兼容注释语法') with (ORIENTATION=ROW, STORAGE_TYPE=USTORE);

        反解析结果:

        gaussdb=# CREATE TABLE IF NOT EXISTS public.tb6 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
    • 逻辑解码不支持DDL(DCL)/DML混合事务,混合事务中DDL之后的DML解码不支持。
      -- 均不反解析,DCL为不支持语句故不解析,DML处于DCL之后也不反解析
      gaussdb=# BEGIN;
      gaussdb=# GAINT ALL PRIVILEGES to u01;
      gaussdb=# INSERT INTO test1(col1) values(1);
      gaussdb=# COMMIT; 
      
      -- 只反解析第一句和第三句SQL语句
      gaussdb=# BEGIN;
      gaussdb=# CREATE TABLE mix_tran_t4(id int);
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# CREATE TABLE mix_tran_t5(id int);
      gaussdb=# COMMIT; 
      
      -- 只反解析第一句和第二句SQL语句
      gaussdb=# BEGIN;
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# CREATE TABLE mix_tran_t6(id int);
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# COMMIT; 
      
      -- 全反解析
      gaussdb=# BEGIN;
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# CREATE TABLE mix_tran_t7(id int);
      gaussdb=# CREATE TABLE mix_tran_t8(id int);
      gaussdb=# COMMIT; 
      
      -- 只反解析第一句和第三句SQL语句
      gaussdb=# BEGIN;
      gaussdb=# CREATE TABLE mix_tran_t7(id int);
      gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text);
      gaussdb=# CREATE TABLE mix_tran_t8(id int);
      gaussdb=# COMMIT; 
      
      -- 全反解析
      gaussdb=# BEGIN;
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# COMMIT; 
      
      -- 只反解析第一句SQL语句
      gaussdb=# BEGIN;
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text);
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# COMMIT; 
      
      -- 只反解析第一句和第三句SQL语句
      gaussdb=# BEGIN;
      gaussdb=# INSERT INTO mix_tran_t4 VALUES(111);
      gaussdb=# CREATE TYPE compfoo AS (f1 int, f2 text);
      gaussdb=# CREATE TABLE mix_tran_t9(id int);
      gaussdb=# COMMIT; 
    • 逻辑解码语句CREATE TABLE AS SELECT、SELECT INTO和CREATE TABLE AS仅能解码出CREATE TABLE语句,暂不支持解码INSERT语句。

      对于CTAS创建的表,仍会解码其ALTER和DROP语句。

      示例:

      原始SQL语句:

      CREATE TABLE IF NOT EXISTS tb35_2 (c1 int) with (storage_type=USTORE,ORIENTATION=ROW);
      INSERT INTO tb35_2 VALUES (6);
      CREATE TABLE tb35_1 with (storage_type=USTORE,ORIENTATION=ROW) AS SELECT * FROM tb35_2;

      最后一句SQL语句反解析结果:

       CREATE  TABLE  public.tb35_1 (c1 pg_catalog.int4) WITH (storage_type = 'ustore', orientation = 'row', compression = 'no') NOCOMPRESS;
    • 执行存储过程/函数/高级包时,若其本身包含DDL/DML混合事务或者其本身与同事务内其他语句组成DDL/DML混合事务,则按照混合事务原则执行解码。
    • 逻辑解码不支持账本数据库功能,创建账本数据库的DDL语句解码结果中会包含hash列。
      • 原始语句:
        CREATE SCHEMA blockchain_schema WITH BLOCKCHAIN;
        CREATE TABLE blockchain_schema.blockchain_table(mes int);
      • 解码结果:
        CREATE SCHEMA blockchain_schema WITH BLOCKCHAIN;
        CREATE TABLE blockchain_schema.blockchain_table (mes pg_catalog.int4, hash_a1d895 pg_catalog.hash16);  -- 此语句无法在目标端回放。

        需要在目标端手动关闭blockchain_schema的防篡改属性后,才可以正常回放,此时目标端的blockchain_table等同于一张普通表,再次执行DML命令可以正常回放。

        SQL命令:

        ALTER SCHEMA blockchain_schema WITHOUT BLOCKCHAIN;
        CREATE TABLE blockchain_schema.blockchain_table (mes pg_catalog.int4, hash_a1d895 pg_catalog.hash16);
  • 串行逻辑解码支持DDL特有约束:
    • sql_decoding插件不支持json格式的DDL。

解码格式

  • JSON格式

    对于输入的DDL语句,SQL引擎解析器会通过语法、词法分析将其分解为解析树,解析树节点中包含了DDL的全部信息,并且执行器会根据解析树内容,执行系统元信息的修改。在执行器执行完成之后,便可以获取到DDL操作数据对象的search_path。本特性在执行器执行成功之后,对解析树信息以及执行器结果进行反解析,以还原出DDL原语句的全部信息。反解析的方式可以分解整个DDL语句,以方便输出JSON格式的DDL,用以适配异构数据库场景。

    CREATE TABLE语句在经过词法、语法分析之后,得到对应的CreateStmt解析树节点,节点中包含了表信息、列信息、分布式信息(DistributeBy结构体)、分区信息(PartitionState结构)等。通过反解析后,可输出的JSON格式如下:

    {"JDDL":{"fmt":"CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D %{table_elements}s %{with_clause}s %{compression}s","identity":{"object_name":"test_create_table_a","schema_name":"public"},"compression":"NOCOMPRESS","persistence":"","with_clause":{"fmt":"WITH (%{with:, }s)","with":[{"fmt":"%{label}s = %{value}L","label":{"fmt":"%{label}I","label":"orientation"},"value":"row"},{"fmt":"%{label}s = %{value}L","label":{"fmt":"%{label}I","label":"compression"},"value":"no"}]},"if_not_exists":"","table_elements":{"fmt":"(%{elements:, }s)","elements":[{"fmt":"%{name}I %{column_type}T","name":"a","column_type":{"typmod":"","typarray":false,"type_name":"int4","schema_name":"pg_catalog"}}]}}}

    可以看到,JSON格式中包含对象的search_path,其中的identity键标识schema为public,表名为test_create_table_a,其中%{persistence}s对应的字段如下,此SQL语句不含此字段所以为空。

    [ [ GLOBAL | LOCAL ] [ TEMPORARY | TEMP ] | UNLOGGED ]

    %{if_not_exists}s对应SQL语句中的字段,不含此字段所以为空:

    [ IF NOT EXISTS ]

    %{identity}D对应SQL语句中的字段:

    table_name  

    %{table_elements}s对应SQL语句中的字段:

     (column_name data_type) 

    %{with_clause}s对应SQL语句中的字段:

    [ WITH ( {storage_parameter = value} [, ... ] ) ]

    %{compression}s对应SQL语句中的字段:

    [ COMPRESS | NOCOMPRESS ]
  • decode-style指定格式

    输出的格式由decode-style参数控制,如当decode-style='j'时,输出格式如下:

    {"TDDL":"CREATE  TABLE  public.test_create_table_a (a pg_catalog.int4) WITH (orientation = 'row', compression = 'no') NOCOMPRESS"}

    其中语句中也包含Schema名称。

接口设计

  • 新增控制参数
    1. 新增逻辑解码控制参数,用于控制DDL的反解析流程以及输出形式。可通过JDBC接口或者pg_logical_slot_peek_changes开启。
      • enable-ddl-decoding:默认false,不开启DDL语句的逻辑解码;值为true时,开启DDL语句的逻辑解码。
      • enable-ddl-json-format:默认false,传送TEXT格式的DDL反解析结果;值为true时,传送JSON格式的DDL反解析结果。
    2. 新增GUC参数
      • enable_logical_replication_ddl:默认为ON,ON状态下,逻辑复制可支持DDL,否则,不支持DDL。只有当ON状态下,才会对DDL执行结果进行反解析,并生成DDL的WAL日志。否则,不反解析也不生成WAL日志。

        enable_logical_replication_ddl的开关日志,以证明是否是用户修改了该参数导致逻辑解码不支持DDL。

  • 新增日志

    新增DDL日志xl_logical_ddl_message,其类型为RM_LOGICALDDLMSG_ID。其定义如下:

    名称

    类型

    意义

    db_id

    OID

    数据库ID

    rel_id

    OID

    表ID

    csn

    CommitSeqNo

    CSN快照

    cid

    CommandId

    Command ID

    tag_type

    NodeTag

    DDL类型

    message_size

    Size

    日志内容长度

    filter_message_size

    Size

    日志中白名单过滤信息长度

    message

    char *

    DDL内容

使用步骤

  1. 逻辑解码特性需提前设置GUC参数wal_level为logical,该参数需要重启生效。

    gs_guc set -Z datanode -D $node_dir -c "wal_level = logical"

    其中,$node_dir为数据库节点路径,用户可根据实际情况替换。

  2. 以具有REPLICATION权限的用户登录GaussDB数据库主节点,使用如下命令连接数据库。

    gsql -U user1 -W password -d db1 -p 16000 -r

    其中,user1为用户名,password为密码,db1为需要连接的数据库名称,16000为数据库端口号,用户可根据实际情况替换。

  3. 创建名称为slot1的逻辑复制槽。

    1
    2
    3
    4
    5
    gaussdb=# SELECT * FROM pg_create_logical_replication_slot('slot1', 'mppdb_decoding');
     slotname | xlog_position 
    ----------+---------------
     slot1    | 0/3764C788
    (1 row)
    

  4. 在数据库中创建Package。

    1
    2
    3
    4
    5
    6
    gaussdb=# CREATE OR REPLACE PACKAGE ldp_pkg1 IS
        var1 int:=1;  --公有变量
        var2 int:=2;
        PROCEDURE testpro1(var3 int);  --公有存储过程,可以被外部调用
    END ldp_pkg1;
    /
    

  5. 读取复制槽slot1解码结果,可通过JDBC接口或者pg_logical_slot_peek_changes推进复制槽。

    • 逻辑解码选项请参见逻辑解码选项和新增控制参数。
    • 并行解码中,在JDBC接口中改变参数decode_style可以决定解码格式:

      通过配置选项decode-style,指定解码格式。其取值为char型的字符'j'、't'或'b',分别代表json格式、text格式及二进制格式。

    1
    2
    3
    4
    5
    6
    7
    gaussdb=# SELECT data FROM pg_logical_slot_peek_changes('slot1', NULL, NULL, 'enable-ddl-decoding', 'true', 'enable-ddl-json-format', 'false') WHERE data not like 'BEGIN%' AND data not like 'COMMIT%' AND data not like '%dbe_pldeveloper.gs_source%';
    
                                                                                                          data                  
    
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- {"TDDL":"CREATE OR REPLACE PACKAGE public.ldp_pkg1 AUTHID CURRENT_USER IS  var1 int:=1;  --公有变量\n    var2 int:=2;\n    
    PROCEDURE testpro1(var3 int);  --公有存储过程,可以被外部调用\nEND  ldp_pkg1; \n /"}
    (1 row)
    

  6. 删除逻辑复制槽slot1,删除package ldp_pkg1。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    gaussdb=# SELECT * FROM pg_drop_replication_slot('slot1');
     pg_drop_replication_slot
    --------------------------
    
    (1 row)
    
    gaussdb=# DROP PACKAGE ldp_pkg1;
    NOTICE:  drop cascades to function public.testpro1(integer)
    DROP PACKAGE
    

相关文档