GaussDB分布式版同步到Kafka
场景描述
本实践通过创建DRS同步任务,实现将源数据库GaussDB分布式版的增量数据同步到目标端Kafka。
前提条件
- 拥有华为云实名认证账号。
- 账户余额大于等于0元。
- 已登录数据复制服务控制台。
服务列表
- 虚拟私有云 VPC
- 云数据库GaussDB分布式版
- 分布式消息服务Kafka
- 数据复制服务 DRS
GaussDB分布式数据库架构说明
GaussDB分布式主要包含了OM、CM、GTM、CN和DN等模块,分布式版主要流程为业务应用下发SQL给CN ,SQL可以包含对数据的增(insert)、删(delete/drop)、改(update)、查(select)。CN利用数据库的优化器生成执行计划,下发给DN,每个DN会按照执行计划的要求去处理数据,处理完成后DN将结果集返回给CN进行汇总,最后CN将汇总后的结果返回给业务应用。
分布式形态能够支撑较大的数据量,且提供了横向扩展的能力,可以通过扩容的方式提高实例的数据容量和并发能力。扩容操作详见扩容实例。
DRS同步网络示意图
本示例中,DRS源数据库为华为云GaussDB分布式版,目标端为分布式消息服务Kafka,通过VPC网络,将源数据库的增量数据同步到目标端,部署架构可参考如下图示:
使用说明
- 本实践的资源规划仅作为演示,实际业务场景资源以用户实际需求为准。
- 本实践端到端的数据为测试数据,仅供参考;更多关于DRS使用相关内容请单击这里了解。
资源规划
类别 |
子类 |
规划 |
备注 |
---|---|---|---|
VPC |
VPC名称 |
vpc-DRStest |
自定义,易理解可识别。 |
子网网段 |
10.0.0.0/24 |
子网选择时建议预留足够网络资源 |
|
所属Region |
华南-广州 |
选择和自己业务区最近的Region,减少网络时延。 |
|
子网名称 |
subnet-drs01 |
自定义,易理解可识别。 |
|
GaussDB(源库) |
实例名 |
drs-gaussdbv5-src-1 |
自定义,易理解可识别。 |
数据库版本 |
GaussDB 8.102 |
- |
|
实例类型 |
分布式版 |
参考GaussDB数据库类型说明,选择适合自己业务的库类型。 |
|
存储 |
超高IO |
GaussDB支持“超高IO”存储类型,最大吞吐量为350MB/S。 |
|
规格 |
独享型 8vCPUs | 64 GB |
根据自己业务承载选择规格。 |
|
Kafka(目标端) |
Kafka实例名 |
kafka-drs |
自定义,易理解可识别。 |
版本 |
2.3.0 |
- |
|
可用区 |
可用区三 |
可选择1个或者3个及以上可用区。实际业务场景推荐选择创建在不同的可用区,提升业务可靠性。 |
|
规格 |
c6.2u4g.cluster |
- |
|
代理个数 |
3 |
- |
|
存储空间 |
高I/O,200GB |
存储空间主要用于存储消息(包含副本,Kafka默认使用3副本),除了存储消息外还需要预留部分空间用于存储日志和元数据。 |
|
DRS同步任务 |
同步任务名 |
DRS-GaussDBToKafka |
自定义。 |
源数据库引擎 |
GaussDB分布式版 |
- |
|
目标数据库引擎 |
Kafka |
本示例中目标数据库为Kafka |
|
网络类型 |
VPC网络 |
创建任务的时候选择“VPN、专线网络”。 |
操作流程
创建DRS任务,并且将GaussDB分布式版数据增量同步到Kafka的主要任务流程如图所示。
创建VPC
创建VPC,为创建GaussDB实例准备网络资源。
- 登录华为云控制台。
- 单击管理控制台左上角的,选择区域。
- 单击左侧的服务列表图标,选择
。
进入虚拟私有云信息页面。
- 单击“创建虚拟私有云”购买VPC。
- 单击“立即创建”。
- 返回VPC列表,查看创建VPC是否创建完成。
当VPC列表的VPC状态为“可用”时,表示VPC创建完成。
创建安全组
创安全组,为创建GaussDB实例准备安全组。
- 登录华为云控制台。
- 单击管理控制台左上角的,选择区域。
- 单击左侧的服务列表图标,选择
。
进入虚拟私有云信息页面。
- 选择“访问控制 > 安全组”。
- 单击“创建安全组”。
- 填写安全组名称等信息。
- 单击“确定”。
GaussDB实例构造数据
- 登录华为云控制台。
- 单击管理控制台左上角的,选择区域。
- 单击左侧的服务列表图标,选择 。
- 选择GaussDB实例,单击实例后的“更多 > 登录”。
- 在弹出的对话框中输入密码后,单击“测试连接”检查。
- 连接成功后单击“登录”,登录GaussDB实例。
- 单击“新建数据库”,创建db_test测试库。
- 在db_test库中执行如下语句,创建对应的测试表schema_test.table1。
create table schema_test.table1(c1 int primary key,c2 varchar(10),c3 TIMESTAMP(6));
创建Kafka实例
本章节介绍创建Kafka实例。
创建Kafka的Topic
- 在“Kafka专享版”页面,单击Kafka实例的名称。
- 选择“Topic管理”页签,单击“创建Topic”。
- 在弹出的“创建Topic”的对话框中,填写Topic名称和配置信息,单击“确定”,完成创建Topic。
创建DRS同步任务
- 登录华为云控制台。
- 单击管理控制台左上角的,选择区域。
- 单击左侧的服务列表图标,选择 。
- 选择左侧“实时同步管理”,单击“创建同步任务”。
- 填写同步任务参数:
- 配置同步任务名称。
- 选择需要同步任务的源库、目标数据库以及网络信息。
这里的源库选择创建GaussDB分布式实例中创建的GaussDB实例。
- 选择规格类型和可用区。
- 单击“开始创建”。
同步实例创建中,大约需要5-10分钟。
- 配置源库信息和目标库数据库密码。
- 单击“下一步”。
- 选择同步信息、策略、消息格式和对象等,投递到Kafka的消息格式。
本次选择如下。
表2 同步设置 类别
设置
同步Topic策略
集中投递到一个Topic,Topic名称“testTopic”。
不同Topica策略,对应选择的Kafka partition策略也不同,详细的说明可参考同步Topic和Partition策略说明。
同步到Kafka partition策略
全部投递到同Partition 0。
不同Topica策略,对应选择的Kafka partition策略也不同,详细的说明可参考同步Topic和Partition策略说明。
投递到Kafka的数据格式
可选择JSON格式,可参考Kafka消息格式。
同步对象
同步对象选择db_test库下的schema_test.table1。
- 单击“下一步”,等待预检查结果。
- 当所有检查都是“通过”时,单击"下一步”。
- 确认同步任务信息正确后,单击“启动任务”。
返回DRS实时同步管理,查看同步任务状态。
启动中状态一般需要几分钟,请耐心等待。
当状态变更为“增量同步”,表示同步任务已启动。
- 当前示例中GaussDB分布式到Kafka选择单增量同步,任务启动后为增量同步状态。
- 如果创建的任务为全量+增量同步,任务启动后先进入全量同步,全量数据同步完成后进入增量同步状态。
- 增量同步会持续性同步增量数据,不会自动结束。
确认同步任务执行结果
由于本次实践为增量同步模式,DRS任务会将源库的产生的增量数据持续同步至目标库中,直到手动任务结束。下面我们通过在源库GaussDB中插入数据,查看Kafka的接收到的数据来验证同步结果。
- 登录华为云控制台。
- 单击管理控制台左上角的,选择区域。
- 单击左侧的服务列表图标,选择 ”。
- 单击GaussDB实例后的“更多 > 登录”。
- 在弹出的对话框中,输入实例密码,单击“测试连接”检查。
- 连接成功后单击“登录”,登录GaussDB实例。
- 在DRS同步对象的db_test.schema_test.table1表中,执行如下语句,插入数据。
insert into schema_test.table1 values(1,'testKafka',current_timestamp(6)); update schema_test.table1 set c2 ='G2K' where c1 =1; delete schema_test.table1 where c1 =1;
- 通过Kafka客户端查看接收到JSON格式数据。
./kafka-console-consumer.sh --bootstrap-server ip:port --topic testTopic --from-beginning
- 结束同步任务。
根据业务情况,确认数据已全部同步至目标库,可以结束当前任务。
- 单击“操作”列的“结束”。
- 仔细阅读提示后,单击“是”,结束任务。
同步Topic和Partition策略说明
Topic策略 |
对应可选的Partition策略 |
说明 |
---|---|---|
集中投递到一个Topic: 对于源库业务量不大的场景,建议选择集中投递到一个Topic。 |
按库名.schema.表名的hash值投递到不同Partition |
适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 |
全部投递到Partition 0 |
适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 |
|
按表的主键值hash值投递到不同的Partition |
适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 |
|
按库名.schema的hash值投递到不同Partition |
适用于一个database一个topic的场景,避免多个schema下的数据都写到一个分区,消费者可以并行从各分区获取数据。 |
|
按库名.dn序号的hash值投递到不同Partition |
适用于多个database对应一个topic,避免多个datanode下的数据都写到一个分区,消费者可以并行从各分区获取数据。 |
|
按表的非主键列值的hash值投递到不同的Partition |
适用于一个表一个Topic的场景,避免该表都写到同一个分区,用户可以按照非主键列值的hash值自定义message key,消费者可以并行从各分区获取数据。 |
|
按库名-schema-表名自动生成Topic名字: 如果每张表数据量都非常大,建议选择自动生成Topic名字,按库名-schema-表名确定一个Topic。 |
全部投递到Partition 0 |
适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 |
按表的主键值hash值投递到不同的Partition |
适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 |
|
按表的非主键列值的hash值投递到不同的Partition |
适用于一个表一个Topic的场景,避免该表都写到同一个分区,用户可以按照非主键列值的hash值自定义message key,消费者可以并行从各分区获取数据。 |
|
按库名自动生成Topic名字: 对于源库数据量量不大的场景,可以选择一个database自动生成Topic名字,按库名确定一个Topic。 |
按库名.schema.表名的hash值投递到不同Partition |
适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 |
全部投递到Partition 0 |
适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 |
|
按库名.schema的hash值投递到不同Partition |
适用于一个database一个topic的场景,避免多个schema下的数据写到一个分区,消费者可以并行从各分区获取数据。 |
|
按库名.dn序号的hash值投递到不同Partition |
适用于一个database对应一个topic,避免多个dn上的数据写到一个分区,消费者可以并行从各分区获取数据。 |
|
按库名-schema自动生成Topic名字: 如果每个schema数据量都非常大,建议选择按库名-schema自动生成Topic名字,按库名-schema确定一个Topic。 |
按库名.schema.表名的hash值投递到不同Partition |
适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 |
全部投递到Partition 0 |
适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 |
|
按库名-dn序号自动生成Topic名字: 如果每个dn上数据量很大,建议选择按库名-dn序号自动生成Topic名字,按库名-dn序号确定一个Topic。 |
按库名.schema.表名的hash值投递到不同Partition |
适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 |
按库名.schema的hash值投递到不同Partition |
适用于一个database一个topic的场景,避免多个schema下的数据都写到一个分区,消费者可以并行从各分区获取数据。 |
|
全部投递到Partition 0 |
适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 |