更新时间:2025-11-12 GMT+08:00
进行PostgreSQL增量DDL同步需要做哪些准备?
通过在源库中创建事件触发器和函数来获取源库的DDL信息,然后在Migration的增量实时同步阶段实现DDL操作的同步。
前提条件
操作步骤
- 使用拥有创建事件触发器权限的用户连接要同步的数据库。
- 执行如下语句,创建存储DDL信息的表。
DROP TABLE IF EXISTS public.hw_migration_ddl_info; DROP SEQUENCE IF EXISTS public.hw_migration_ddl_info_id_seq; CREATE TABLE public.hw_migration_ddl_info( id bigserial primary key, ddl text, username varchar(64) default current_user, txid varchar(16) default txid_current()::varchar(16), tag varchar(64), database varchar(64) default current_database(), schema varchar(64) default current_schema, client_address varchar(64) default inet_client_addr(), client_port integer default inet_client_port(), event_time timestamp default current_timestamp );
- 执行如下语句,创建函数。
CREATE OR REPLACE FUNCTION public.hw_migration_ddl_function() RETURNS event_trigger LANGUAGE plpgsql SECURITY INVOKER AS $$ declare ddl text; declare real_num int; declare max_num int := 50000; begin if (tg_tag in ('ALTER TABLE','DROP TABLE')) then select current_query() into ddl; insert into public.hw_migration_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time) values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema, inet_client_addr(), inet_client_port(), current_timestamp); select count(id) into real_num from public.hw_migration_ddl_info; if real_num > max_num then if current_setting('server_version_num')::int<100000 then delete from public.hw_migration_ddl_info where id<(select min(id)+1000 from public.hw_migration_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hw_migration_ddl_info_pkey')::oid and mode='RowExclusiveLock'); else delete from public.hw_migration_ddl_info where id<(select min(id)+1000 from public.hw_migration_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress'); end if; end if; end if; end; $$; - 执行以下语句,为2和3中创建的对象赋予必要权限。
GRANT USAGE ON SCHEMA public TO public; GRANT SELECT,INSERT,DELETE ON TABLE public.hw_migration_ddl_info TO public; GRANT SELECT,USAGE ON SEQUENCE public.hw_migration_ddl_info_id_seq TO public; GRANT EXECUTE ON FUNCTION public.hw_migration_ddl_function() TO public;
- 执行以下语句,创建DDL事件触发器。
CREATE EVENT TRIGGER hw_migration_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hw_migration_ddl_function();
- 执行以下语句,将创建的事件触发器设置为enable。
ALTER EVENT TRIGGER hw_migration_ddl_event ENABLE ALWAYS;
- 创建PostgreSQL为源的同步任务。
- 待同步任务结束后,如果不需要再执行PostgreSQL为源的同步任务,请执行下语句删除创建的表、函数、触发器。
DROP EVENT trigger hw_migration_ddl_event; DROP FUNCTION public.hw_migration_ddl_function(); DROP TABLE public.hw_migration_ddl_info;
父主题: 数据集成(实时作业)