文档首页/ 云数据库 RDS_云数据库 RDS for PostgreSQL/ 最佳实践/ 使用事件触发器实现DDL回收站、防火墙、增量订阅同步
更新时间:2025-08-28 GMT+08:00
分享

使用事件触发器实现DDL回收站、防火墙、增量订阅同步

功能介绍

RDS for PostgreSQL开放事件触发器(Event Trigger),可以实现DDL回收站、DDL防火墙、DDL增量订阅同步等功能。灵活使用事件触发器可以减少维护成本,保护数据安全。

如果您对数据库安全有非常高的要求,可以通过PostgreSQL的事件触发器机制,实现DDL回收站和DDL防火墙功能,从多个维度保护数据安全。

本文提供一套基于PostgreSQL事件触发器实现DDL回收站、防火墙、增量订阅同步的最佳实践方案示例。

表1 事件触发器功能说明

功能类型

作用

实现方式

事前防御

阻止危险DDL操作,例如:DROP TABLE、DROP INDEX、DROP DATABASE。

通过ddl_command_start事件触发器拦截未授权操作。

事后回档

在意外删除表后,支持从回收站恢复。

通过ddl_command_end和sql_drop事件触发器记录DDL操作。

DDL回收站

通过使用pg_get_ddl_command和pg_get_ddl_drop这两个事件触发器收集并存入DDL语句到表ddl_recycle.ddl_log中,可以帮助您记录数据库DDL操作以便于事后进行回溯。

  1. 创建专用模式和表。
    CREATE SCHEMA ddl_recycle;
    
    CREATE TABLE IF NOT EXISTS ddl_recycle.ddl_log (
        id SERIAL PRIMARY KEY,
        event_time TIMESTAMPTZ DEFAULT NOW(),
        username TEXT,
        database TEXT,
        client_addr INET,
        event TEXT,
        tag TEXT,
        object_type TEXT,
        schema_name TEXT,
        object_identity TEXT,
        command TEXT,
        is_dropped BOOLEAN DEFAULT FALSE
    );
  2. 创建事件触发器函数。
    CREATE OR REPLACE FUNCTION ddl_recycle.log_ddl_command()
    RETURNS event_trigger AS $$
    DECLARE
        cmd TEXT;
        obj RECORD;
    BEGIN
        SELECT query INTO cmd FROM pg_stat_activity WHERE pid = pg_backend_pid();
    
        IF TG_EVENT = 'ddl_command_end' THEN
            FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP
                INSERT INTO ddl_recycle.ddl_log (
                    username, database, client_addr, event, tag,
                    object_type, schema_name, object_identity, command
                ) VALUES (
                    current_user, current_database(), inet_client_addr(),
                    TG_EVENT, TG_TAG, obj.object_type, obj.schema_name,
                    obj.object_identity, cmd
                );
            END LOOP;
        ELSIF TG_EVENT = 'sql_drop' THEN
            FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
                -- 标记原记录为已删除
                UPDATE ddl_recycle.ddl_log
                SET is_dropped = TRUE
                WHERE object_identity = obj.object_identity AND is_dropped = FALSE;
    
                -- 插入一条新记录,用来记录删除操作
                INSERT INTO ddl_recycle.ddl_log (
                    username, database, client_addr, event, tag,
                    object_type, schema_name, object_identity, command, is_dropped
                ) VALUES (
                    current_user, current_database(), inet_client_addr(),
                    TG_EVENT, TG_TAG, obj.object_type, obj.schema_name,
                    obj.object_identity, cmd, TRUE
                );
            END LOOP;
        END IF;
    END;
    $$ LANGUAGE plpgsql;
  3. 注册事件触发器。
    CREATE EVENT TRIGGER ddl_recycle_trigger
    ON ddl_command_end
    EXECUTE FUNCTION ddl_recycle.log_ddl_command();
    
    CREATE EVENT TRIGGER ddl_drop_trigger
    ON sql_drop
    EXECUTE FUNCTION ddl_recycle.log_ddl_command();

    执行完以上命令后,您的DDL语句就会记录在表test_ddl.tb_ddl_command中。

  4. 执行一个DDL语句,查看是否能记录变更。
    create table ddl_recycle.a(id int);
    select * from ddl_recycle.ddl_log;
    图1 查看执行结果

DDL防火墙

您可以根据业务需求创建事件触发器,使用ddl_command_start事件类型可以阻止相应的DDL语句执行。

  1. 创建防火墙规则表。
    CREATE SCHEMA IF NOT EXISTS ddl_firewall;
    
    CREATE TABLE IF NOT EXISTS ddl_firewall.rules (
        id SERIAL PRIMARY KEY,
        username TEXT,
        tag TEXT,
        pattern TEXT,
        action TEXT CHECK (action IN ('BLOCK', 'LOG')),
        created_at TIMESTAMPTZ DEFAULT NOW()
    );
    
  2. 创建防火墙触发器函数。
    RETURNS event_trigger AS $$
    DECLARE
        cmd TEXT;
        rule RECORD;
        is_blocked BOOLEAN := FALSE;
    BEGIN
        SELECT query INTO cmd FROM pg_stat_activity WHERE pid = pg_backend_pid();
    
        FOR rule IN SELECT * FROM ddl_firewall.rules
                    WHERE (username = current_user OR username = '*')
                      AND (tag = TG_TAG OR tag = '*')
                    ORDER BY id
        LOOP
            IF cmd ~ rule.pattern THEN
                IF rule.action = 'BLOCK' THEN
                    RAISE EXCEPTION 'DDL operation blocked by rule: %', rule.id;
                ELSIF rule.action = 'LOG' THEN
                    INSERT INTO ddl_recycle.ddl_log (
                        username, database, client_addr, event, tag, command
                    ) VALUES (
                        current_user, current_database(), inet_client_addr(),
                        'FIREWALL_LOG', TG_TAG, cmd
                    );
                END IF;
            END IF;
        END LOOP;
    END;
    $$ LANGUAGE plpgsql;
  3. 注册事件触发器。
    CREATE EVENT TRIGGER ddl_firewall_trigger
    ON ddl_command_start
    EXECUTE FUNCTION ddl_firewall.check_ddl();
  4. 添加防火墙规则。
    insert into ddl_firewall.rules values(1,'test', 'DROP TABLE', '', 'BLOCK');
  5. 使用test执行删除表操作时会被拦截。
    create table ddl_firewall.a(id int);
    drop table ddl_firewall.a;
    图2 查看执行结果

DDL增量订阅同步

发布端将已经执行了的DDL语句存储在ddl_recycle.ddl_log中,订阅端可以读取记录进行数据同步。

  1. 在发布端执行发布命令。
    CREATE PUBLICATION my_ddl_publication FOR TABLE ONLY ddl_recycle.ddl_log;
  2. 在订阅端创建相同的表。
    CREATE SCHEMA ddl_recycle;
    CREATE TABLE IF NOT EXISTS ddl_recycle.ddl_log (
        id SERIAL PRIMARY KEY,
        event_time TIMESTAMPTZ DEFAULT NOW(),
        username TEXT,
        database TEXT,
        client_addr INET,
        event TEXT,
        tag TEXT,
        object_type TEXT,
        schema_name TEXT,
        object_identity TEXT,
        command TEXT,
        is_dropped BOOLEAN DEFAULT FALSE
    );
  3. 在订阅端创建订阅。
    CREATE SUBSCRIPTION my_ddl_subscriptin CONNECTION 'host=*** port=*** user=*** password=***  dbname=**' PUBLICATION my_ddl_publication;
  4. 在订阅端针对ddl_recycle.ddl_log表创建相应的触发器,实现DDL增量同步。

相关文档