文档首页> 数据复制服务 DRS> 用户指南(巴黎区域)> 实时同步> 同步场景操作参考> 通过创建触发器和函数实现PostgreSQL增量DDL同步
更新时间:2023-01-05 GMT+08:00

通过创建触发器和函数实现PostgreSQL增量DDL同步

本小结介绍PostgreSQL->RDS for PostgreSQL实时同步,通过在源库创建触发器和函数获取源库的DDL信息,然后在DRS增量实时同步阶段实现DDL操作的同步。

前提条件

  • 当前支持的DDL操作包含如下:
    • 表级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)。
    • 库级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、CREATE SCHEMA/TABLE、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)、CREATE SEQUENCE、DROP SEQUENCE、ALTER SEQUENCE、CREATE INDEX、ALTER INDEX、DROP INDEX、CREATE VIEW、ALTER VIEW。
    • 表级同步:RENAME表名之后,向更改名称后的表插入新的数据时,DRS不会同步新的数据到目标库。
    • 库级同步:源库使用非CREATE TABLE方式创建的表不会同步到目标库。常见地如:使用CREATE TABLE AS创建表、调用函数创建表。
    • 暂不支持以注释开头的DDL语句的同步,以注释开头的DDL语句将被忽略。
    • 不支持函数和存储过程中DDL语句的同步,函数和存储过程中执行的DDL语句将被忽略。
  • 源库和目标库版本不同时,请使用源库和目标库都兼容的SQL语句执行DDL操作。例如:源库为pg11,目标库为pg12,要将源库表的列类型从char修改为int时,请使用如下语句:
    alter table tablename alter column columnname type int USING columnname::int;
  • 执行如下操作步骤前,请检查待同步的源数据库public模式下,是否存在名为hwdrs_ddl_info的表、名为hwdrs_ddl_function()的函数、名为hwdrs_ddl_event的触发器。如存在,请将其删除。
  • 库级同步时,如创建无主键表,请执行如下命令,将无主键表复制属性设置为full。
    alter table tablename replica identity full;

操作步骤

若源库为本云RDS for PostgreSQL,可以使用root用户创建相关对象,如果执行时报“Must be superuser to create an event trigger”错误,可以通过工单申请处理。本云RDS for PostgreSQL的root用户权限请参见RDS用户指南。

  1. 使用拥有创建事件触发器权限的用户连接要同步的数据库。
  2. 执行如下语句,创建存储DDL信息的表。

    DROP TABLE IF EXISTS public.hwdrs_ddl_info;
    DROP SEQUENCE IF EXISTS public.hwdrs_ddl_info_id_seq;
    CREATE TABLE public.hwdrs_ddl_info(
      id                             bigserial primary key,
      ddl                            text,
      username                       varchar(64) default current_user, 
      txid                           varchar(16) default txid_current()::varchar(16),
      tag                            varchar(64), 
      database                       varchar(64) default current_database(), 
      schema                         varchar(64) default current_schema,
      client_address                 varchar(64) default inet_client_addr(),
      client_port                    integer default inet_client_port(),
      event_time                     timestamp default current_timestamp
    );

  3. 执行如下语句,创建函数。

    CREATE OR REPLACE FUNCTION public.hwdrs_ddl_function()
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY INVOKER
    AS $BODY$
        declare ddl text;
        declare real_num int;
        declare max_num int := 50000;
    begin
      if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SCHEMA','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then
          select current_query() into ddl; 
          insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
          values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
          select count(id) into real_num from public.hwdrs_ddl_info;
          if real_num > max_num then
            if current_setting('server_version_num')::int<100000 then
              delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='RowExclusiveLock');
            else 
              delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
            end if;
          end if;
      end if;
    end;
    $BODY$;

  4. 执行以下语句,为23中创建的对象赋予必要权限。

    GRANT USAGE ON SCHEMA public TO public;
    GRANT SELECT,INSERT,DELETE ON TABLE public.hwdrs_ddl_info TO public;
    GRANT SELECT,USAGE ON SEQUENCE public.hwdrs_ddl_info_id_seq TO public;
    GRANT EXECUTE ON FUNCTION public.hwdrs_ddl_function() TO public;

  5. 执行以下语句,创建DDL事件触发器。

    CREATE EVENT TRIGGER hwdrs_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hwdrs_ddl_function();

  6. 执行以下语句,将创建的事件触发器设置为enable。

    ALTER EVENT TRIGGER hwdrs_ddl_event ENABLE ALWAYS;

  7. 返回数据复制服务控制台,创建PostgreSQL->RDS for PostgreSQL的同步任务。
  8. 待同步任务结束后,请执行下语句删除创建的表、函数、触发器。

    DROP EVENT trigger hwdrs_ddl_event;
    DROP FUNCTION public.hwdrs_ddl_function();
    DROP TABLE public.hwdrs_ddl_info;