网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts 盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts 盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务
更新时间:2025-01-07 GMT+08:00
分享

订阅实时数仓Binlog

Binlog使用介绍

当用户需要捕获数据库事件用于数据增量导出Flink等第三方组件,并协同完成数据加工等任务时,DWS实时数仓中的HStore表提供了Binlog功能,通过消费Binlog数据来实现上下游的数据同步,提高数据加工的效率。

传统的数据比如MySQL数据库等,支持通过Binlog来记录数据库中所有数据的变化,但相比于MySQL的Binlog主要用于数据恢复与主从复制,DWS实时数仓Binlog一般只用于实时场景下的数据同步。同时DWS实时数仓Binlog并不会记录DDL操作,只记录Insert/Delete/Update/(Upsert)等DML操作。

GaussDB(DWS)的Binlog的优点如下:

  • 表级按需开关:按需给指定表打开或关闭binlog功能,更为灵活。
  • 全增量一体消费:支持Flink任务启动后,先全量同步source,再实时消费source端增量。
  • 支持消费即清理:对于空间敏感且只关注实时同步与加工的客户,支持消费后即开始异步清理增量,有效减少空间使用。

利用Flink强大的实时处理能力和GaussDB(DWS)的Binlog能力,可以快速构建实时数仓,且无需维护其他组件(如kafka),整体架构分层清晰,数据可以高效流动,并且整体任务链路都可以通过Flink SQL来驱动,便于业务人员理解和使用。

约束与限制

  1. 当前仅8.3.0.100及以上版本支持HStore和Hstore-opt记录Binlog功能,且V3表处于试商用阶段,使用前需要联系技术支持进行评估。
  2. 使用Binlog的前置条件是必须存在主键约束,并且为HStore表或者Hstore-opt表,分布方式只能是Hash分布。
  3. Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。
  4. 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。
  5. 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行:

    ADD COLUMN 增加列、DROP COLUMN 删除列、SET TYPE 修改列、TRUNCATE 清空表数据。

  6. Binlog表在线或者离线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。
  7. VACUUM FULL Binlog表时,会等待Binlog记录的消费,只有Binlog记录消费完毕才可以进行接下来的VACUUM FULL操作,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出VACUUM FULL过程,认为该表VACUUM FULL失败。且由于需要等待Binlog记录消费完毕,所以即使VACUUM FULL一个分区表,也会对分区的主表上7级锁,阻塞整个表的插入更新或者删除。
  8. Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。
  9. 支持Binlog时间戳功能,通过设置enable_binlog_timestamp打开,同样只有HStore和Hstore-opt两种表支持打开。该约束仅9.1.0.200及以上版本支持。

Binlog格式与原理

表1 binlog字段格式

字段名称

字段类型

含义

gs_binlog_sync_point

BIGINT

Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。

gs_binlog_event_sequence

BIGINT

Binlog的系统字段, 用于表示同一事务类操作的先后顺序。

gs_binlog_event_type

CHAR

Binlog的系统字段, 表示当前记录的操作类型。

type可能有以下几种取值:

  • 'I' 即INSERT, 表示当前Binlog是插入一条新记录。
  • 'd' 即DELETE,表示当前Binlog是删除一条记录。
  • 'B' 即BEFORE_UPDATE,表示当前Binlog是更新前的记录。
  • 'U'即AFTER_UPDATE,表示当前Binlog是更新后的记录。

gs_binlog_timestamp_us

BIGINT

Binlog的系统字段, 表示当前记录入库时的时间戳。

只有开启binlog时间戳功能时会有,没开启binlog时间戳时为空。仅9.1.0.200及以上版本支持。

user_column_1

用户列

用户的自定义数据列

...

...

...

usert_column_n

用户列

用户的自定义数据列

  • UPDATE(或者UPSERT触发的更新操作)会产生两条Binlog,分别是BEFORE_UPDATE类型与AFTER_UPDATE类型,其中BEFORE_UDPATE主要用于保证Flink等第三方组件做数据加工后加工结果的正确性。
  • 对于用户UPDATE/DELETE操作产生的BEFORE_UPDATE以及DELETE类型的Binlog,DWS实时数仓中这两类Binlog类型并不会在操作执行时就反查出所有用户列并填充,保证了入库的性能。
  • 当在DWS实时数仓给某个HStore表开启Binlog功能时, 本质就是给HStore表再创建一张辅助表(也通过HStore实现),该辅助表包含gs_binlog_event_sync_point、gs_binlog_event_event_sequence、gs_binlog_event_type三个系统列以及一个将所有用户列序列化存储到一个字段的value列。
  • 当开启binlog时间戳功能时(表级参数enable_binlog_timestamp),辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。当开启普通binlog时(表级参数enable_binlog),辅助表上记录的binlog,只要被下游消费就会允许异步清理,能大幅度减少空间占用。仅9.1.0.200及以上版本支持。

开启Binlog

通过在建HStore表时指定表级参数enable_binlog,开启HStore表的Binlog功能。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true, 
    enable_binlog=on,
    binlog_ttl = 86400
);
  • 对于开启了Binlog的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
  • binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒,当同步任务注册的同步点超过TTL没有进行增量同步时,该同步点位将被清理。最老的同步点位之前的Binlog(即被所有任务消费了的Binlog)会被异步清理来回收空间。
  • 空间开销:对于开启普通binlog的表,如果能保证增量被下游及时消费,那么空间就能被及时清理回收。

通过执行ALTER命令给已有的HStore表开启binlog功能:

1
2
3
4
5
6
7
8
9
CREATE TABLE hstore_binlog_source (
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true
);
ALTER TABLE hstore_binlog_source SET (enable_binlog=on);

查询binlog

通过DWS提供的系统函数,可以直接查询目标表在指定DN上binlog信息,以及是否被下游消费完毕等信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
-- 模拟Flink调用系统函数获取同步点,参数分别表示 表名、槽位名、是否checkPoint点位,目标DN(为0表示所有DN)。
select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', false, 0);
select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', true, 0);
-- 进行增删改产生增量binlog。
INSERT INTO hstore_binlog_source VALUES(100, 1, 1);
delete hstore_binlog_source where c1 = 100;
INSERT INTO hstore_binlog_source VALUES(200, 1, 1);
update hstore_binlog_source set c2 =2 where c1 = 200;
-- 模拟Flink调用系统函数查询指定CSN区间的Binlog,参数分别表示表名,目标DN(为0表示所有DN),起始CSN点位, 终止CSN点位。
select * from pgxc_get_binlog_changes('hstore_binlog_source', 0, 0 , 9999999999);

可以看到两次INSERT操作产生了两个gs_binlog_event_type是'I'的记录,DELETE操作产生了type是'd'的记录,UPDATE产生了一行BeforeUpdate的'B'记录以及一条AfterUpdate的'U'记录,分别表示更新前的值以及更新后的值。

通过调用系统函数pgxc_consumed_binlog_records可以查询目标表的binlog是否被所有槽位消费完毕。参数分别表示目标表名以及目标DN(为0表示所有DN)。

1
2
3
4
5
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100);
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100);
-- 查询表上binlog是否被全部消费,返回 1 表示已经被下游槽位全部消费。
select * from pgxc_consumed_binlog_records('hstore_binlog_source',0);

开启Binlog时间戳功能

如果需要读取指定时间点之后binlog的功能,通过在建HStore表时指定表级参数enable_binlog_timestamp,开启HStore表的Binlog时间戳功能。仅9.1.0.200及以上版本支持。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE hstore_binlog_source(
    c1  INT PRIMARY KEY,
    c2  INT,
    c3  INT
) WITH (
    ORIENTATION = COLUMN, 
    enable_hstore_opt=true, 
    enable_binlog_timestamp =on,
    binlog_ttl = 86400
);
  • 对于开启了Binlog时间戳的表,并不会立刻给入库的操作记录Binlog,需要再给同步任务注册同步点后,才会开始记录Binlog(开启Flink同步binlog任务后,会自动循环进行获取同步点、获取增量数据、注册同步点操作)。
  • binlog_ttl是可选参数,当不设置时将使用默认值86400, 单位为秒(即默认保留一天),当Binlog记录的时间戳距离现在大于TTL时,会被异步清理。
  • 空间开销:对于开启binlog时间戳的表,辅助表上记录的binlog记录会严格保留至超过TTL才会清理,这会带来数倍的额外空间开销(具体倍数与TTL时间内的更新入库量有关)。

查询开启binlog时间戳功能的表上的Binlog:

将gs_binlog_timestamp_us从BigInt类型转成可读的时间戳:

1
 select to_timestamp(1731569598408661/1000000);

获取各个DN上,目标表指定时间点后的第一条binlog信息(为空表示这个时间点后没有binlog):

1
 select * from pgxc_get_binlog_cursor_by_timestamp('hstore_binlog_source','2024-11-14 15:33:18.40866+08', 0);

获取开启binlog时间戳功能的表的消费进度:

返回的字段表示最近消费的一条binlog的时间戳,binlog上的最近时间戳,最近消费的一条binlog的CSN点位, binlog上最近CSN点位,未消费的binlog记录数量。

1
2
3
4
5
-- 模拟Flink调用系统函数注册同步点,参数分别表示表名,槽位名,注册的点位,是否属于checkPoint, 点位对应的xmin (获取同步点时会提供)。
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, false, 100);
select pgxc_register_binlog_sync_point('hstore_binlog_source', 'slot1', 0, 9999999999, true, 100);
-- 查询目标表各个槽位的消费进度。
select * from pgxc_get_binlog_consume_progress('hstore_binlog_source', 0);

控制DML不产生Binlog

通过设置会话级参数enable_generate_binlog为off,可以控制当前会话的DML,在给开启binlog的表入库时,不产生binlog记录。

相关文档