文档首页/ 数据治理中心 DataArts Studio/ 常见问题/ 数据集成(实时作业)/ 进行PostgreSQL增量DDL同步需要做哪些准备?
更新时间:2025-11-12 GMT+08:00
分享

进行PostgreSQL增量DDL同步需要做哪些准备?

通过在源库中创建事件触发器和函数来获取源库的DDL信息,然后在Migration的增量实时同步阶段实现DDL操作的同步。

前提条件

  • 当前支持的DDL操作包含如下:

    表级同步支持:DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN)。

  • 不支持使用Using进行类型强转的DDL。
  • 执行如下操作步骤前,请检查待同步的源数据库public模式下,是否存在名为hw_migration_ddl_info的表、名为hw_migration_ddl_function()的函数、名为hw_migration_ddl_event的触发器。如存在,请将其删除。

操作步骤

  1. 使用拥有创建事件触发器权限的用户连接要同步的数据库。
  2. 执行如下语句,创建存储DDL信息的表。

    DROP TABLE IF EXISTS public.hw_migration_ddl_info;
    DROP SEQUENCE IF EXISTS public.hw_migration_ddl_info_id_seq;
    CREATE TABLE public.hw_migration_ddl_info(  
    id                             bigserial primary key,  
    ddl                            text,  
    username                       varchar(64) default current_user,    
    txid                           varchar(16) default txid_current()::varchar(16),  
    tag                            varchar(64),    
    database                       varchar(64) default current_database(),    
    schema                         varchar(64) default current_schema,  
    client_address                 varchar(64) default inet_client_addr(),  
    client_port                    integer default inet_client_port(),  
    event_time                     timestamp default current_timestamp
    );
    
        

  3. 执行如下语句,创建函数。

    CREATE OR REPLACE FUNCTION public.hw_migration_ddl_function()
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY INVOKER
    AS $$
        declare ddl text;
        declare real_num int;
        declare max_num int := 50000;
    begin
      if (tg_tag in ('ALTER TABLE','DROP TABLE')) then
          select current_query() into ddl; 
          insert into public.hw_migration_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
          values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
          select count(id) into real_num from public.hw_migration_ddl_info;
          if real_num > max_num then
            if current_setting('server_version_num')::int<100000 then
              delete from public.hw_migration_ddl_info where id<(select min(id)+1000 from public.hw_migration_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hw_migration_ddl_info_pkey')::oid and mode='RowExclusiveLock');
            else 
              delete from public.hw_migration_ddl_info where id<(select min(id)+1000 from public.hw_migration_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
            end if;
          end if;
      end if;
    end;
    $$;

  4. 执行以下语句,为2和3中创建的对象赋予必要权限。

    GRANT USAGE ON SCHEMA public TO public; GRANT SELECT,INSERT,DELETE ON TABLE public.hw_migration_ddl_info TO public; GRANT SELECT,USAGE ON SEQUENCE public.hw_migration_ddl_info_id_seq TO public; GRANT EXECUTE ON FUNCTION public.hw_migration_ddl_function() TO public;

  5. 执行以下语句,创建DDL事件触发器。

    CREATE EVENT TRIGGER hw_migration_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hw_migration_ddl_function();

  6. 执行以下语句,将创建的事件触发器设置为enable。

    ALTER EVENT TRIGGER hw_migration_ddl_event ENABLE ALWAYS;

  7. 创建PostgreSQL为源的同步任务。
  8. 待同步任务结束后,如果不需要再执行PostgreSQL为源的同步任务,请执行下语句删除创建的表、函数、触发器。

    DROP EVENT trigger hw_migration_ddl_event;
    DROP FUNCTION public.hw_migration_ddl_function();
    DROP TABLE public.hw_migration_ddl_info;

相关文档