Help Center/ Data Replication Service/ Real-Time Synchronization/ Operation Reference in Synchronization Scenarios/ Creating Event Triggers and Functions to Implement Incremental DDL Synchronization for PostgreSQL
Updated on 2024-12-01 GMT+08:00

Creating Event Triggers and Functions to Implement Incremental DDL Synchronization for PostgreSQL

This section describes how to perform real-time synchronization from PostgreSQL to RDS for PostgreSQL. You can create event triggers and functions in the source database to obtain the DDL information of the source database, and then synchronize DDL operations to the destination database during the incremental synchronization phase.

Prerequisites

  • The following DDL operations are supported:
    • Table-level synchronization: TRUNCATE (only for PostgreSQL 11 or later), DROP TABLE, ALTER TABLE (including ADD COLUMN, DROP COLUMN, ALTER COLUMN, RENAME COLUMN, ADD CONSTRAINT, DROP CONSTRAINT and RENAME), COMMENT ON COLUMN, and COMMENT ON TABLE
    • Database-level synchronization: TRUNCATE (only for PostgreSQL 11 or later), CREATE SCHEMA/TABLE, DROP TABLE, ALTER TABLE (including ADD COLUMN, DROP COLUMN, ALTER COLUMN, RENAME COLUMN, ADD CONSTRAINT, DROP CONSTRAINT and RENAME), CREATE SEQUENCE, DROP SEQUENCE, ALTER SEQUENCE, CREATE INDEX, ALTER INDEX, DROP INDEX, CREATE VIEW, ALTER VIEW, COMMENT ON COLUMN, COMMENT ON TABLE, COMMENT ON SCHEMA, COMMENT ON SEQUENCE, COMMENT ON INDEX, and COMMENT ON VIEW
    • Table-level synchronization: If data is inserted into a renamed table, the data will not be synchronized to the destination database.
    • Database-level synchronization: Tables that are created not using the CREATE TABLE statement in the source database will not be synchronized to the destination database. For example, you run CREATE TABLE AS to create a table or call a function to create a table.
    • DDL statements starting with comments cannot be synchronized and are ignored.
    • DDL statements executed in functions and stored procedures cannot be synchronized and are ignored.
  • If the source and destination databases are of different versions, use SQL statements that are compatible with both the source and destination databases to perform DDL operations. For example, if the source database is PostgreSQL 11 and the destination database is PostgreSQL 12, run the following statement to change the column type from char to int:
    alter table tablename alter column columnname type int USING columnname::int;
  • Check whether a table named hwdrs_ddl_info, a function named hwdrs_ddl_function(), and a trigger named hwdrs_ddl_event exist in the source database in public mode. If they exist, delete them.
  • During database-level synchronization, if a table without a primary key is created, run the following command to set the replication attribute of the table without a primary key to full.
    alter table tablename replica identity full;

Procedure

  • If the source database is a self-managed PostgreSQL database or a database on another cloud platform, perform the following steps:
    1. Connect to the database to be synchronized as a user who has permission to create event triggers.
    2. Run the following statements to create a table for storing DDL information:
      DROP TABLE IF EXISTS public.hwdrs_ddl_info;
      DROP SEQUENCE IF EXISTS public.hwdrs_ddl_info_id_seq;
      CREATE TABLE public.hwdrs_ddl_info(
        id                             bigserial primary key,
        ddl                            text,
        username                       varchar(64) default current_user, 
        txid                           varchar(16) default txid_current()::varchar(16),
        tag                            varchar(64), 
        database                       varchar(64) default current_database(), 
        schema                         varchar(64) default current_schema,
        client_address                 varchar(64) default inet_client_addr(),
        client_port                    integer default inet_client_port(),
        event_time                     timestamp default current_timestamp
      );
    3. Run the following statements to create a function:
      CREATE OR REPLACE FUNCTION public.hwdrs_ddl_function()
          RETURNS event_trigger
          LANGUAGE plpgsql
          SECURITY INVOKER
      AS $$
          declare ddl text;
          declare real_num int;
          declare max_num int := 50000;
      begin
        if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SCHEMA','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX','COMMENT')) then
            select current_query() into ddl; 
            insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
            values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
            select count(id) into real_num from public.hwdrs_ddl_info;
            if real_num > max_num then
              if current_setting('server_version_num')::int<100000 then
                delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='RowExclusiveLock');
              else 
                delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
              end if;
            end if;
        end if;
      end;
      $$;
    4. Run the following statements to grant necessary permissions to the objects created in 2 and 3:
      GRANT USAGE ON SCHEMA public TO public;
      GRANT SELECT,INSERT,DELETE ON TABLE public.hwdrs_ddl_info TO public;
      GRANT SELECT,USAGE ON SEQUENCE public.hwdrs_ddl_info_id_seq TO public;
      GRANT EXECUTE ON FUNCTION public.hwdrs_ddl_function() TO public;
    5. Run the following statement to create a DDL event trigger:
      CREATE EVENT TRIGGER hwdrs_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hwdrs_ddl_function();
    6. Run the following statement to set the created event trigger to enable:
      ALTER EVENT TRIGGER hwdrs_ddl_event ENABLE ALWAYS;
    7. Return to the DRS console and create a PostgreSQL to RDS PostgreSQL synchronization task.
    8. After the synchronization task is complete, run the following statements to delete the created tables, functions, and triggers.
      DROP EVENT trigger hwdrs_ddl_event;
      DROP FUNCTION public.hwdrs_ddl_function();
      DROP TABLE public.hwdrs_ddl_info;
  • If the source database is RDS for PostgreSQL, perform the following steps:
    1. Run the following statements to delete the created objects:
      DROP EVENT TRIGGER IF EXISTS hwdrs_ddl_event;
      DROP FUNCTION IF EXISTS public.hwdrs_ddl_function();
      DROP TABLE IF EXISTS public.hwdrs_ddl_info;
    2. Run the following statement as the root user to create a DDL plug-in:
      SELECT CONTROL_EXTENSION('create', 'rds_hwdrs_ddl');
    3. Run the following statements to update the function:
      CREATE OR REPLACE FUNCTION public.hwdrs_ddl_function()
          RETURNS event_trigger
          LANGUAGE plpgsql
          SECURITY INVOKER
      AS $BODY$
          declare ddl text;
          declare real_num int;
          declare max_num int := 50000;
      begin
        if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SCHEMA','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX','COMMENT')) then
            select current_query() into ddl; 
            insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
            values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
            select count(id) into real_num from public.hwdrs_ddl_info;
            if real_num > max_num then
              if current_setting('server_version_num')::int<100000 then
                delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='RowExclusiveLock');
              else 
                delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
              end if;
            end if;
        end if;
      end;
      $BODY$;
    4. Return to the DRS console and create a synchronization task from PostgreSQL to RDS for PostgreSQL.
    5. After the synchronization task is complete, run the following statement to delete the created tables, functions, and triggers.
      SELECT CONTROL_EXTENSION('drop', 'rds_hwdrs_ddl');