订阅实时数仓Binlog
Binlog使用介绍
当用户需要捕获数据库事件用于数据增量导出Flink等第三方组件,并协同完成数据加工等任务时,DWS实时数仓中的HStore表提供了Binlog功能,通过消费Binlog数据来实现上下游的数据同步,提高数据加工的效率。
传统的数据比如MySQL数据库等,支持通过Binlog来记录数据库中所有数据的变化,但相比于MySQL的Binlog主要用于数据恢复与主从复制,DWS实时数仓Binlog一般只用于实时场景下的数据同步。同时DWS实时数仓Binlog并不会记录DDL操作,只记录Insert/Delete/Update/(Upsert)等DML操作。
GaussDB(DWS)的Binlog的优点如下:
- 表级按需开关:按需给指定表打开或关闭binlog功能,更为灵活。
- 全增量一体消费:支持Flink任务启动后,先全量同步source,再实时消费source端增量。
- 支持消费即清理:对于空间敏感且只关注实时同步与加工的客户,支持消费后即开始异步清理增量,有效减少空间使用。
利用Flink强大的实时处理能力和GaussDB(DWS)的Binlog能力,可以快速构建实时数仓,且无需维护其他组件(如kafka),整体架构分层清晰,数据可以高效流动,并且整体任务链路都可以通过Flink SQL来驱动,便于业务人员理解和使用。
约束与限制
- 当前仅8.3.0.100及以上版本支持HStore和Hstore-opt记录Binlog功能,且V3表处于试商用阶段,使用前需要联系技术支持进行评估。
- 使用Binlog的前置条件是必须存在主键约束,并且为HStore表或者Hstore-opt表,分布方式只能是Hash分布。
- Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。
- 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。
- 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行:
- Binlog表在线或者离线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。
- VACUUM FULL Binlog表时,会等待Binlog记录的消费,只有Binlog记录消费完毕才可以进行接下来的VACUUM FULL操作,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出VACUUM FULL过程,认为该表VACUUM FULL失败。且由于需要等待Binlog记录消费完毕,所以即使VACUUM FULL一个分区表,也会对分区的主表上7级锁,阻塞整个表的插入更新或者删除。
- Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。
- 支持Binlog时间戳功能,通过设置enable_binlog_timestamp打开,同样只有HStore和Hstore-opt两种表支持打开。该约束仅9.1.0.200及以上版本支持。
Binlog格式与原理
字段名称 |
字段类型 |
含义 |
---|---|---|
gs_binlog_sync_point |
BIGINT |
Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。 |
gs_binlog_event_sequence |
BIGINT |
Binlog的系统字段, 用于表示同一事务类操作的先后顺序。 |
gs_binlog_event_type |
CHAR |
Binlog的系统字段, 表示当前记录的操作类型。 type可能有以下几种取值:
|
gs_binlog_timestamp_us |
BIGINT |
Binlog的系统字段, 表示当前记录入库时的时间戳。 只有开启binlog时间戳功能时会有,没开启binlog时间戳时为空。仅9.1.0.200及以上版本支持。 |
user_column_1 |
用户列 |
用户的自定义数据列 |
... |
... |
... |
usert_column_n |
用户列 |
用户的自定义数据列 |
- UPDATE(或者UPSERT触发的更新操作)会产生两条Binlog,分别是BEFORE_UPDATE类型与AFTER_UPDATE类型,其中BEFORE_UDPATE主要用于保证Flink等第三方组件做数据加工后加工结果的正确性。
- 对于用户UPDATE/DELETE操作产生的BEFORE_UPDATE以及DELETE类型的Binlog,DWS实时数仓中这两类Binlog类型并不会在操作执行时就反查出所有用户列并填充,保证了入库的性能。
- 当在DWS实时数仓给某个HStore表开启Binlog功能时, 本质就是给HStore表再创建一张辅助表(也通过HStore实现),该辅助表包含gs_binlog_event_sync_point、gs_binlog_event_event_sequence、gs_binlog_event_type三个系统列以及一个将所有用户列序列化存储到一个字段的value列。
- 当开启binlog时间戳功能时(表级参数enable_binlog_timestamp),辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。当开启普通binlog时(表级参数enable_binlog),辅助表上记录的binlog,只要被下游消费就会允许异步清理,能大幅度减少空间占用。仅9.1.0.200及以上版本支持。
开启Binlog
1 2 3 4 5 6 7 8 9 10 |
CREATE TABLE hstore_binlog_source ( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore_opt=true, enable_binlog=on, binlog_ttl = 86400 ); |
- 对于开启了Binlog的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
- binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒,当同步任务注册的同步点超过TTL没有进行增量同步时,该同步点位将被清理。最老的同步点位之前的Binlog(即被所有任务消费了的Binlog)会被异步清理来回收空间。
- 空间开销:对于开启普通binlog的表,如果能保证增量被下游及时消费,那么空间就能被及时清理回收。
通过执行ALTER命令给已有的HStore表开启binlog功能:
1 2 3 4 5 6 7 8 9 |
CREATE TABLE hstore_binlog_source ( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore_opt=true ); ALTER TABLE hstore_binlog_source SET (enable_binlog=on); |
查询binlog
通过DWS提供的系统函数,可以直接查询目标表在指定DN上binlog信息,以及是否被下游消费完毕等信息。
1 2 3 4 5 6 7 8 9 10 |
-- 模拟Flink调用系统函数获取同步点,参数分别表示 表名、槽位名、是否checkPoint点位,目标DN(为0表示所有DN)。 select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', false, 0); select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', true, 0); -- 进行增删改产生增量binlog。 INSERT INTO hstore_binlog_source VALUES(100, 1, 1); delete hstore_binlog_source where c1 = 100; INSERT INTO hstore_binlog_source VALUES(200, 1, 1); update hstore_binlog_source set c2 =2 where c1 = 200; -- 模拟Flink调用系统函数查询指定CSN区间的Binlog,参数分别表示表名,目标DN(为0表示所有DN),起始CSN点位, 终止CSN点位。 select * from pgxc_get_binlog_changes('hstore_binlog_source', 0, 0 , 9999999999); |
可以看到两次INSERT操作产生了两个gs_binlog_event_type是'I'的记录,DELETE操作产生了type是'd'的记录,UPDATE产生了一行BeforeUpdate的'B'记录以及一条AfterUpdate的'U'记录,分别表示更新前的值以及更新后的值。
通过调用系统函数pgxc_consumed_binlog_records可以查询目标表的binlog是否被所有槽位消费完毕。参数分别表示目标表名以及目标DN(为0表示所有DN), 返回值。
1 2 3 4 5 |
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。 select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100); select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100); -- 查询表上binlog是否被全部消费,返回 1 表示已经被下游槽位全部消费。 select * from pgxc_consumed_binlog_records('hstore_binlog_source',0); |
开启Binlog时间戳功能
1 2 3 4 5 6 7 8 9 10 |
CREATE TABLE hstore_binlog_source( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore_opt=true, enable_binlog_timestamp =on, binlog_ttl = 86400 ); |
- 对于开启了Binlog时间戳的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
- binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒(即默认保留一天),当Binlog记录的时间戳距离现在大于TTL时,会被异步清理。
- 空间开销:对于开启binlog时间戳的表,辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。
查询开启binlog时间戳功能的表上的Binlog:
将gs_binlog_timestamp_us从BigInt类型转成可读的时间戳:
1
|
select to_timestamp(1731569598408661/1000000); |
获取各个DN上,目标表指定时间点后的第一条binlog信息(为空表示这个时间点后没有binlog):
1
|
select * from pgxc_get_binlog_cursor_by_timestamp('hstore_binlog_source','2024-11-14 15:33:18.40866+08', 0); |
获取开启binlog时间戳功能的表的消费进度:
返回的字段表示最近消费的一条binlog的时间戳,binlog上的最近时间戳,最近消费的一条binlog的CSN点位, binlog上最近CSN点位,未消费的binlog记录数量。
1 2 3 4 5 |
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。 select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100); select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100); -- 查询目标表各个槽位的消费进度。 select * from pgxc_get_binlog_consume_progress('hstore_binlog_source', 0); |
控制DML不产生Binlog
通过设置会话级参数enable_generate_binlog为off,可以控制当前会话的DML,在给开启binlog的表入库时,不产生binlog记录。