更新时间:2024-10-09 GMT+08:00

订阅实时数仓Binlog

Binlog使用介绍

当用户需要捕获数据库事件用于数据增量导出Flink等第三方组件,并协同完成数据加工等任务时,DWS实时数仓中的HStore表提供了Binlog功能,通过消费Binlog数据来实现上下游的数据同步,提高数据加工的效率。

传统的数据比如MySQL数据库等,支持通过Binlog来记录数据库中所有数据的变化,但相比于MySQL的Binlog主要用于数据恢复与主从复制,DWS实时数仓Binlog一般只用于实时场景下的数据同步。同时DWS实时数仓Binlog并不会记录DDL操作,只记录Insert/Delete/Update/(Upsert)等DML操作。

约束与限制

  1. Binlog功能当前为beta版本,受限商用,如需使用请联系技术支持。
  2. 当前仅8.3.0.100及以上版本支持HStore和HStore-opt记录Binlog功能。
  3. 使用Binlog的前置条件是必须存在主键约束,并且为HStore或Hstore-opt表,分布方式只能是Hash分布。
  4. Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。
  5. 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。
  6. 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行:

    ADD COLUMN 增加列、DROP COLUMN 删除列、SET TYPE 修改列、VACUUM FULL 重整数据、TRUNCATE 清空表数据。

  7. Binlog表在线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕后才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。
  8. Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。

Binlog格式与原理

表1 binlog字段格式

字段名称

字段类型

含义

sync_point

BIGINT

Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。

event_sequence

BIGINT

Binlog的系统字段, 用于表示同一事务类操作的先后顺序。

type

CHAR

Binlog的系统字段, 表示当前记录的操作类型。

type可能有以下几种取值:

  • 'I' 即INSERT, 表示当前Binlog是插入一条新记录。
  • 'd' 即DELETE,表示当前Binlog是删除一条记录。
  • 'B' 即BEFORE_UPDATE,表示当前Binlog是更新前的记录。
  • 'U'即AFTER_UPDATE,表示当前Binlog是更新后的记录。

user_column_1

用户列

用户的自定义数据列

...

...

...

usert_column_n

用户列

用户的自定义数据列

  • UPDATE(或者UPSERT触发的更新操作)会产生两条Binlog,分别是BEFORE_UPDATE类型与AFTER_UPDATE类型,其中BEFORE_UDPATE主要用于保证Flink等第三方组件做数据加工后加工结果的正确性。
  • 对于用户UPDATE/DELETE操作产生的BEFORE_UPDATE以及DELETE类型的Binlog,DWS实时数仓中这两类Binlog类型并不会在操作执行时就反查出所有用户列并填充,保证了入库的性能。
  • 当在DWS实时数仓给某个HStore表开启Binlog功能时, 本质就是给HStore表再创建一张辅助表(也通过HStore实现),该辅助表包含sync_point、event_sequence、type三个系统列以及一个将所有用户列序列化存储到一个字段的value列。

开启Binlog

通过在建HStore表时指定表级参数enable_binlog,开启HStore表的Binlog功能。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore=true, 
    enable_binlog=on,
    binlog_ttl = 86400
);
  • 对于开启了Binlog的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
  • binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒,当同步任务注册的同步点超过TTL没有进行增量同步时,该同步点位将被清理。最老的同步点位之前的Binlog(即被所有任务消费了的Binlog)会被异步清理来回收空间。

通过执行ALTER命令给已有的HStore表开启binlog功能:

1
2
3
4
5
6
7
8
9
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore=true
);
ALTER TABLE hstore_binlog_source SET (enable_binlog=on);