更新时间:2024-01-24 GMT+08:00

PostgreSQL为源强制结束任务

本小节介绍PostgreSQL为源的同步链路在强制结束任务后,如何清理源库逻辑复制槽、如何同步序列值以及如何在源数据库已经无法连接的情况下,重置目标库中自增或自减列关联序列的序列值。

复制槽名命名规则为“drs_唯一性标识”。其中唯一性标识需要将任务node id中的“-”替换为“_”获取,node id可在DRS任务同步日志页面查看“task node id is ***”日志找到。

清理源库逻辑复制槽

  1. 使用对应DRS任务的源数据库连接用户,登录该同步任务的源数据库。
  2. 查询同步任务选择的database对象所对应的流复制槽名称。

    select slot_name from pg_replication_slots where database = 'database';

    其中database为DRS同步任务中选择同步的database。

  3. 执行如下语句,删除对应的流复制槽。

    select * from pg_drop_replication_slot('slot_name');

    其中slot_name2中查询的流复制槽名称。

  4. 执行如下语句,查询流复制槽是否成功删除。

    select slot_name from pg_replication_slots where slot_name = 'slot_name';

    查询结果为空表示DRS同步任务对应的流复制槽已成功删除。

同步序列值

如果未同步序列对象或者目标库为GaussDB,可忽略此节。

  1. 使用高权限账号(需要具有所有序列的USAGE权限)连接对应DRS任务同步的源数据库,执行如下语句。

    select 'SELECT pg_catalog.setval('||quote_literal(quote_ident(n.nspname)||'.'||quote_ident(c.relname))||', '||nextval(c.oid)||');' as sqls from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' and not (c.relname='hwdrs_ddl_info_id_seq' and n.nspname='public') order by n.nspname, c.relname;

    查询结果为需要在目标数据库中执行的sql语句。

  2. 使用对应DRS任务的目标数据库链接用户连接该任务同步的目标数据库,在目标库中执行步骤1中查询出的sql语句。
  3. 在目标库中执行如下语句,检查序列值同步结果。

    SELECT n.nspname, c.relname, nextval(c.oid) from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' order by 1,2;

源库无法连接时,重置目标库中的序列值

在某些极端场景下,源数据库可能已经被损坏而无法连接,此时仍需将目标库中与自增或自减列相关联的序列值进行重置。如果源数据可以连接,请忽略此节,参考上节操作即可。

  1. 使用对应DRS任务的目标数据库测试链接用户,登录该同步任务的目标数据库。
  2. 使用如下语句,查询出将nextval作为表列默认值的序列对应的序列值重置sql语句。

    set search_path to ''; select 'SELECT pg_catalog.setval('||quote_literal(quote_ident(s.sequence_schema)||'.'||quote_ident(s.sequence_name))||', (SELECT '||case when s.increment::int<0 then 'min(' else 'max(' end|| quote_ident(c.column_name)||')'||case when s.increment::int<0 then '-1' else '+1' end||' FROM '||quote_ident(c.table_schema)||'.'||quote_ident(c.table_name)||'));' as sqls from information_schema.columns c join information_schema.sequences s on (position(quote_literal (quote_ident(s.sequence_schema)||'.'||quote_ident(s.sequence_name))||'::regclass' in c.column_default) > 0) where c.data_type in ('bigint', 'int', 'integer', 'smallint', 'numeric', 'real', 'double precision', 'double') and c.column_default like 'nextval(%%' order by s.sequence_schema, s.sequence_name;

    查询结果为需要在目标数据库中执行的sql语句。

  3. 如果源库的版本小于10.0,请忽略此步。如果源库的版本不小于10.0,请在目标库中执行如下语句查询出重置表标识列附加序列的对应序列值的sql语句。

    set search_path to ''; select 'SELECT pg_catalog.setval('||quote_literal(seqname)||', (SELECT '||case when increment::int<0 then 'min(' else 'max(' end||colname||')'||case when increment::int<0 then '-1' else '+1' end||' FROM '||tablename||'));' as sqls from (select objid::regclass::text, refobjid::regclass::text, (pg_identify_object(refclassid,refobjid,refobjsubid)).identity, (pg_sequence_parameters(objid)).increment from pg_depend where deptype='i' and refobjsubid>0 and objid in (select c.oid from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind='S' and n.nspname !~ '^pg_' and n.nspname<>'information_schema')) p(seqname,tablename,colname,increment);

    查询结果为需要在目标数据库中执行的sql语句。

  4. 在目标库中执行步骤2和步骤3中查询出的sql语句。
  5. 在目标库中执行如下语句,检查序列值同步结果。

    SELECT n.nspname, c.relname, nextval(c.oid) from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' order by 1,2;