更新时间:2025-01-07 GMT+08:00

订阅实时数仓Binlog

Binlog使用介绍

当用户需要捕获数据库事件用于数据增量导出Flink等第三方组件,并协同完成数据加工等任务时,DWS实时数仓中的HStore表提供了Binlog功能,通过消费Binlog数据来实现上下游的数据同步,提高数据加工的效率。

传统的数据比如MySQL数据库等,支持通过Binlog来记录数据库中所有数据的变化,但相比于MySQL的Binlog主要用于数据恢复与主从复制,DWS实时数仓Binlog一般只用于实时场景下的数据同步。同时DWS实时数仓Binlog并不会记录DDL操作,只记录Insert/Delete/Update/(Upsert)等DML操作。

GaussDB(DWS)的Binlog的优点如下:

  • 表级按需开关:按需给指定表打开或关闭binlog功能,更为灵活。
  • 全增量一体消费:支持Flink任务启动后,先全量同步source,再实时消费source端增量。
  • 支持消费即清理:对于空间敏感且只关注实时同步与加工的客户,支持消费后即开始异步清理增量,有效减少空间使用。

利用Flink强大的实时处理能力和GaussDB(DWS)的Binlog能力,可以快速构建实时数仓,且无需维护其他组件(如kafka),整体架构分层清晰,数据可以高效流动,并且整体任务链路都可以通过Flink SQL来驱动,便于业务人员理解和使用。

约束与限制

  1. 当前仅8.3.0.100及以上版本支持HStore和Hstore-opt记录Binlog功能,且V3表处于试商用阶段,使用前需要联系技术支持进行评估。
  2. 使用Binlog的前置条件是必须存在主键约束,并且为HStore表或者Hstore-opt表,分布方式只能是Hash分布。
  3. Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。
  4. 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。
  5. 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行:

    ADD COLUMN 增加列、DROP COLUMN 删除列、SET TYPE 修改列、TRUNCATE 清空表数据。

  6. Binlog表在线或者离线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。
  7. VACUUM FULL Binlog表时,会等待Binlog记录的消费,只有Binlog记录消费完毕才可以进行接下来的VACUUM FULL操作,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出VACUUM FULL过程,认为该表VACUUM FULL失败。且由于需要等待Binlog记录消费完毕,所以即使VACUUM FULL一个分区表,也会对分区的主表上7级锁,阻塞整个表的插入更新或者删除。
  8. Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。
  9. 支持Binlog时间戳功能,通过设置enable_binlog_timestamp打开,同样只有HStore和Hstore-opt两种表支持打开。该约束仅9.1.0.200及以上版本支持。

Binlog格式与原理

表1 binlog字段格式

字段名称

字段类型

含义

gs_binlog_sync_point

BIGINT

Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。

gs_binlog_event_sequence

BIGINT

Binlog的系统字段, 用于表示同一事务类操作的先后顺序。

gs_binlog_event_type

CHAR

Binlog的系统字段, 表示当前记录的操作类型。

type可能有以下几种取值:

  • 'I' 即INSERT, 表示当前Binlog是插入一条新记录。
  • 'd' 即DELETE,表示当前Binlog是删除一条记录。
  • 'B' 即BEFORE_UPDATE,表示当前Binlog是更新前的记录。
  • 'U'即AFTER_UPDATE,表示当前Binlog是更新后的记录。

gs_binlog_timestamp_us

BIGINT

Binlog的系统字段, 表示当前记录入库时的时间戳。

只有开启binlog时间戳功能时会有,没开启binlog时间戳时为空。仅9.1.0.200及以上版本支持。

user_column_1

用户列

用户的自定义数据列

...

...

...

usert_column_n

用户列

用户的自定义数据列

  • UPDATE(或者UPSERT触发的更新操作)会产生两条Binlog,分别是BEFORE_UPDATE类型与AFTER_UPDATE类型,其中BEFORE_UDPATE主要用于保证Flink等第三方组件做数据加工后加工结果的正确性。
  • 对于用户UPDATE/DELETE操作产生的BEFORE_UPDATE以及DELETE类型的Binlog,DWS实时数仓中这两类Binlog类型并不会在操作执行时就反查出所有用户列并填充,保证了入库的性能。
  • 当在DWS实时数仓给某个HStore表开启Binlog功能时, 本质就是给HStore表再创建一张辅助表(也通过HStore实现),该辅助表包含gs_binlog_event_sync_point、gs_binlog_event_event_sequence、gs_binlog_event_type三个系统列以及一个将所有用户列序列化存储到一个字段的value列。
  • 当开启binlog时间戳功能时(表级参数enable_binlog_timestamp),辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。当开启普通binlog时(表级参数enable_binlog),辅助表上记录的binlog,只要被下游消费就会允许异步清理,能大幅度减少空间占用。仅9.1.0.200及以上版本支持。

开启Binlog

通过在建HStore表时指定表级参数enable_binlog,开启HStore表的Binlog功能。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true, 
    enable_binlog=on,
    binlog_ttl = 86400
);
  • 对于开启了Binlog的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
  • binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒,当同步任务注册的同步点超过TTL没有进行增量同步时,该同步点位将被清理。最老的同步点位之前的Binlog(即被所有任务消费了的Binlog)会被异步清理来回收空间。
  • 空间开销:对于开启普通binlog的表,如果能保证增量被下游及时消费,那么空间就能被及时清理回收。

通过执行ALTER命令给已有的HStore表开启binlog功能:

1
2
3
4
5
6
7
8
9
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true
);
ALTER TABLE hstore_binlog_source SET (enable_binlog=on);

查询binlog

通过DWS提供的系统函数,可以直接查询目标表在指定DN上binlog信息,以及是否被下游消费完毕等信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
-- 模拟Flink调用系统函数获取同步点,参数分别表示 表名、槽位名、是否checkPoint点位,目标DN(为0表示所有DN)。
select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', false, 0);
select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', true, 0);
-- 进行增删改产生增量binlog。
INSERT INTO hstore_binlog_source VALUES(100, 1, 1);
delete hstore_binlog_source where c1 = 100;
INSERT INTO hstore_binlog_source VALUES(200, 1, 1);
update hstore_binlog_source set c2 =2 where c1 = 200;
-- 模拟Flink调用系统函数查询指定CSN区间的Binlog,参数分别表示表名,目标DN(为0表示所有DN),起始CSN点位, 终止CSN点位。
select * from pgxc_get_binlog_changes('hstore_binlog_source', 0, 0 , 9999999999);

可以看到两次INSERT操作产生了两个gs_binlog_event_type是'I'的记录,DELETE操作产生了type是'd'的记录,UPDATE产生了一行BeforeUpdate的'B'记录以及一条AfterUpdate的'U'记录,分别表示更新前的值以及更新后的值。

通过调用系统函数pgxc_consumed_binlog_records可以查询目标表的binlog是否被所有槽位消费完毕。参数分别表示目标表名以及目标DN(为0表示所有DN)。

1
2
3
4
5
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100);
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100);
-- 查询表上binlog是否被全部消费,返回 1 表示已经被下游槽位全部消费。
select * from pgxc_consumed_binlog_records('hstore_binlog_source',0);

开启Binlog时间戳功能

如果需要读取指定时间点之后binlog的功能,通过在建HStore表时指定表级参数enable_binlog_timestamp,开启HStore表的Binlog时间戳功能。仅9.1.0.200及以上版本支持。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE hstore_binlog_source(
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true, 
    enable_binlog_timestamp =on,
    binlog_ttl = 86400
);
  • 对于开启了Binlog时间戳的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
  • binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒(即默认保留一天),当Binlog记录的时间戳距离现在大于TTL时,会被异步清理。
  • 空间开销:对于开启binlog时间戳的表,辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。

查询开启binlog时间戳功能的表上的Binlog:

将gs_binlog_timestamp_us从BigInt类型转成可读的时间戳:

1
 select to_timestamp(1731569598408661/1000000);

获取各个DN上,目标表指定时间点后的第一条binlog信息(为空表示这个时间点后没有binlog):

1
 select * from pgxc_get_binlog_cursor_by_timestamp('hstore_binlog_source','2024-11-14 15:33:18.40866+08', 0);

获取开启binlog时间戳功能的表的消费进度:

返回的字段表示最近消费的一条binlog的时间戳,binlog上的最近时间戳,最近消费的一条binlog的CSN点位, binlog上最近CSN点位,未消费的binlog记录数量。

1
2
3
4
5
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100);
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100);
-- 查询目标表各个槽位的消费进度。
select * from pgxc_get_binlog_consume_progress('hstore_binlog_source', 0);

控制DML不产生Binlog

通过设置会话级参数enable_generate_binlog为off,可以控制当前会话的DML,在给开启binlog的表入库时,不产生binlog记录。