GaussDB分布式版同步到Kafka
场景描述
本实践通过创建DRS同步任务,实现将源数据库GaussDB分布式版的增量数据同步到目标端Kafka。
前提条件
- 拥有华为云账号。
- 账户余额大于等于0美元。
- 已登录数据复制服务控制台。
服务列表
- 虚拟私有云 VPC
- 云数据库GaussDB分布式版
- 分布式消息服务Kafka
- 数据复制服务 DRS
GaussDB分布式数据库架构说明
 
  GaussDB分布式主要包含了OM、CM、GTM、CN和DN等模块,分布式版主要流程为业务应用下发SQL给CN ,SQL可以包含对数据的增(insert)、删(delete/drop)、改(update)、查(select)。CN利用数据库的优化器生成执行计划,下发给DN,每个DN会按照执行计划的要求去处理数据,处理完成后DN将结果集返回给CN进行汇总,最后CN将汇总后的结果返回给业务应用。
分布式形态能够支撑较大的数据量,且提供了横向扩展的能力,可以通过扩容的方式提高实例的数据容量和并发能力。扩容操作详见扩容实例。
DRS同步网络示意图
本示例中,DRS源数据库为华为云GaussDB分布式版,目标端为分布式消息服务Kafka,通过VPC网络,将源数据库的增量数据同步到目标端,部署架构可参考如下图示:
 
  使用说明
- 本实践的资源规划仅作为演示,实际业务场景资源以用户实际需求为准。
- 本实践端到端的数据为测试数据,仅供参考;更多关于DRS使用相关内容请单击这里了解。
资源规划
| 类别 | 子类 | 规划 | 备注 | 
|---|---|---|---|
| VPC | VPC名称 | vpc-DRStest | 自定义,易理解可识别。 | 
| 子网网段 | 10.0.0.0/24 | 子网选择时建议预留足够网络资源 | |
| 所属Region | 亚太-新加坡 | 选择和自己业务区最近的Region,减少网络时延。 | |
| 子网名称 | subnet-drs01 | 自定义,易理解可识别。 | |
| GaussDB(源库) | 实例名 | drs-gaussdbv5-src-1 | 自定义,易理解可识别。 | 
| 数据库版本 | GaussDB 8.102 | - | |
| 实例类型 | 分布式版 | 参考GaussDB数据库类型说明,选择适合自己业务的库类型。 | |
| 存储 | 超高IO | GaussDB支持“超高IO”存储类型,最大吞吐量为350MB/S。 | |
| 规格 | 独享型 8vCPUs | 64 GB | 根据自己业务承载选择规格。 | |
| Kafka(目标端) | Kafka实例名 | kafka-drs | 自定义,易理解可识别。 | 
| 版本 | 2.3.0 | - | |
| 可用区 | 可用区三 | 可选择1个或者3个及以上可用区。实际业务场景推荐选择创建在不同的可用区,提升业务可靠性。 | |
| 规格 | c6.2u4g.cluster | - | |
| 代理个数 | 3 | - | |
| 存储空间 | 高I/O,200GB | 存储空间主要用于存储消息(包含副本,Kafka默认使用3副本),除了存储消息外还需要预留部分空间用于存储日志和元数据。 | |
| DRS同步任务 | 同步任务名 | DRS-GaussDBToKafka | 自定义。 | 
| 源数据库引擎 | GaussDB分布式版 | - | |
| 目标数据库引擎 | Kafka | 本示例中目标数据库为Kafka | |
| 网络类型 | VPC网络 | 创建任务的时候选择“VPN、专线网络”。 | 
操作流程
创建DRS任务,并且将GaussDB分布式版数据增量同步到Kafka的主要任务流程如图所示。

创建VPC
创建VPC,为创建GaussDB实例准备网络资源。
- 登录华为云控制台。
- 单击管理控制台左上角的 ,选择区域。 ,选择区域。
- 单击左侧的服务列表图标,选择。
    
    进入虚拟私有云信息页面。 
- 单击“创建虚拟私有云”购买VPC。
    
     
- 单击“立即创建”。
- 返回VPC列表,查看创建VPC是否创建完成。
    
    当VPC列表的VPC状态为“可用”时,表示VPC创建完成。 
创建安全组
创安全组,为创建GaussDB实例准备安全组。
- 登录华为云控制台。
- 单击管理控制台左上角的 ,选择区域。 ,选择区域。
- 单击左侧的服务列表图标,选择。
    
    进入虚拟私有云信息页面。 
- 选择“访问控制 > 安全组”。
- 单击“创建安全组”。
- 填写安全组名称等信息。
    
     
- 单击“确定”。
GaussDB实例构造数据
- 登录华为云控制台。
- 单击管理控制台左上角的 ,选择区域。 ,选择区域。
- 单击左侧的服务列表图标,选择。
- 选择GaussDB实例,单击实例后的“更多 > 登录”。
- 在弹出的对话框中输入密码后,单击“测试连接”检查。
- 连接成功后单击“登录”,登录GaussDB实例。
- 单击“新建数据库”,创建db_test测试库。
    
     
- 在db_test库中执行如下语句,创建对应的测试表schema_test.table1。
    
    create table schema_test.table1(c1 int primary key,c2 varchar(10),c3 TIMESTAMP(6)); 
创建Kafka实例
本章节介绍创建Kafka实例。
创建Kafka的Topic
- 在“Kafka专享版”页面,单击Kafka实例的名称。
- 选择“Topic管理”页签,单击“创建Topic”。
- 在弹出的“创建Topic”的对话框中,填写Topic名称和配置信息,单击“确定”,完成创建Topic。
    
     
创建DRS同步任务
- 登录华为云控制台。
- 单击管理控制台左上角的 ,选择区域。 ,选择区域。
- 单击左侧的服务列表图标,选择。
- 选择左侧“实时同步管理”,单击“创建同步任务”。
- 填写同步任务参数:
    
    - 配置同步任务名称。
- 选择需要同步任务的源库、目标数据库以及网络信息。
      这里的源库选择创建GaussDB分布式实例中创建的GaussDB实例。 图3 同步实例信息  
- 选择规格类型和可用区。
 
- 单击“开始创建”。
    
    同步实例创建中,大约需要5-10分钟。 
- 配置源库信息和目标库数据库密码。
- 单击“下一步”。
- 选择同步信息、策略、消息格式和对象等,投递到Kafka的消息格式。
    
    本次选择如下。  表2 同步设置 类别 设置 同步Topic策略 集中投递到一个Topic,Topic名称“testTopic”。 不同Topica策略,对应选择的Kafka partition策略也不同,详细的说明可参考同步Topic和Partition策略说明。 同步到Kafka partition策略 全部投递到同Partition 0。 不同Topica策略,对应选择的Kafka partition策略也不同,详细的说明可参考同步Topic和Partition策略说明。 投递到Kafka的数据格式 可选择JSON格式,可参考Kafka消息格式。 同步对象 同步对象选择db_test库下的schema_test.table1。 
- 单击“下一步”,等待预检查结果。
- 当所有检查都是“通过”时,单击"下一步”。
- 确认同步任务信息正确后,单击“启动任务”。
    
    返回DRS实时同步管理,查看同步任务状态。 启动中状态一般需要几分钟,请耐心等待。 当状态变更为“增量同步”,表示同步任务已启动。   - 当前示例中GaussDB分布式到Kafka选择单增量同步,任务启动后为增量同步状态。
- 如果创建的任务为全量+增量同步,任务启动后先进入全量同步,全量数据同步完成后进入增量同步状态。
- 增量同步会持续性同步增量数据,不会自动结束。
 
确认同步任务执行结果
由于本次实践为增量同步模式,DRS任务会将源库的产生的增量数据持续同步至目标库中,直到手动任务结束。以下步骤通过在源库GaussDB中插入数据,查看Kafka的接收到的数据来验证同步结果。
- 登录华为云控制台。
- 单击管理控制台左上角的 ,选择区域。 ,选择区域。
- 单击左侧的服务列表图标,选择”。
- 单击GaussDB实例后的“更多 > 登录”。
- 在弹出的对话框中,输入实例密码,单击“测试连接”检查。
- 连接成功后单击“登录”,登录GaussDB实例。
- 在DRS同步对象的db_test.schema_test.table1表中,执行如下语句,插入数据。
    
    insert into schema_test.table1 values(1,'testKafka',current_timestamp(6)); update schema_test.table1 set c2 ='G2K' where c1 =1; delete schema_test.table1 where c1 =1; 
- 通过Kafka客户端查看接收到JSON格式数据。
    
    ./kafka-console-consumer.sh --bootstrap-server ip:port --topic testTopic --from-beginning  
- 结束同步任务。
    
    根据业务情况,确认数据已全部同步至目标库,可以结束当前任务。- 单击“操作”列的“结束”。
- 仔细阅读提示后,单击“是”,结束任务。
 
同步Topic和Partition策略说明
| Topic策略 | 对应可选的Partition策略 | 说明 | 
|---|---|---|
| 集中投递到一个Topic: 对于源库业务量不大的场景,建议选择集中投递到一个Topic。 | 按库名.schema.表名的hash值投递到不同Partition | 适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 | 
| 全部投递到Partition 0 | 适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 | |
| 按表的主键值hash值投递到不同的Partition | 适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 | |
| 按库名.schema的hash值投递到不同Partition | 适用于一个database一个topic的场景,避免多个schema下的数据都写到一个分区,消费者可以并行从各分区获取数据。 | |
| 按库名.dn序号的hash值投递到不同Partition | 适用于多个database对应一个topic,避免多个datanode下的数据都写到一个分区,消费者可以并行从各分区获取数据。 | |
| 按表的非主键列值的hash值投递到不同的Partition | 适用于一个表一个Topic的场景,避免该表都写到同一个分区,用户可以按照非主键列值的hash值自定义message key,消费者可以并行从各分区获取数据。 | |
| 按库名-schema-表名自动生成Topic名字: 如果每张表数据量都非常大,建议选择自动生成Topic名字,按库名-schema-表名确定一个Topic。 | 全部投递到Partition 0 | 适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 | 
| 按表的主键值hash值投递到不同的Partition | 适用于一个表一个Topic的场景,避免该表都写到同一个分区,消费者可以并行从各分区获取数据。 | |
| 按表的非主键列值的hash值投递到不同的Partition | 适用于一个表一个Topic的场景,避免该表都写到同一个分区,用户可以按照非主键列值的hash值自定义message key,消费者可以并行从各分区获取数据。 | |
| 按库名自动生成Topic名字: 对于源库数据量量不大的场景,可以选择一个database自动生成Topic名字,按库名确定一个Topic。 | 按库名.schema.表名的hash值投递到不同Partition | 适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 | 
| 全部投递到Partition 0 | 适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 | |
| 按库名.schema的hash值投递到不同Partition | 适用于一个database一个topic的场景,避免多个schema下的数据写到一个分区,消费者可以并行从各分区获取数据。 | |
| 按库名.dn序号的hash值投递到不同Partition | 适用于一个database对应一个topic,避免多个dn上的数据写到一个分区,消费者可以并行从各分区获取数据。 | |
| 按库名-schema自动生成Topic名字: 如果每个schema数据量都非常大,建议选择按库名-schema自动生成Topic名字,按库名-schema确定一个Topic。 | 按库名.schema.表名的hash值投递到不同Partition | 适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 | 
| 全部投递到Partition 0 | 适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 | |
| 按库名-dn序号自动生成Topic名字: 如果每个dn上数据量很大,建议选择按库名-dn序号自动生成Topic名字,按库名-dn序号确定一个Topic。 | 按库名.schema.表名的hash值投递到不同Partition | 适用于单表的查询场景,可以提高单表读写性能,推荐使用此选项。 | 
| 按库名.schema的hash值投递到不同Partition | 适用于一个database一个topic的场景,避免多个schema下的数据都写到一个分区,消费者可以并行从各分区获取数据。 | |
| 全部投递到Partition 0 | 适用于有事务要求的场景,写入性能比较差,如果没有强事务要求,不推荐使用此选项。 | 
 
     
      













