使用MirrorMaker跨集群同步数据
方案概述
应用场景
在以下场景,使用MirrorMaker进行不同集群间的数据同步,可以确保Kafka集群的可用性和可靠性。
- 备份和容灾:企业存在多个数据中心,为了防止其中一个数据中心出现问题,导致业务不可用,会将集群数据同步备份在多个不同的数据中心。
- 集群迁移:当今很多企业将业务迁移上云,迁移过程中需要确保线下集群和云上集群的数据同步,保证业务的连续性。
方案架构
使用MirrorMaker可以实现将源集群中的数据镜像复制到目标集群中。其原理如图1所示,MirrorMaker本质上也是生产消费消息,首先从源集群中消费数据,然后将消费的数据生产到目标集群。如果您需要了解更多关于MirrorMaker的信息,请参见Mirroring data between clusters。
约束与限制
- 源集群中节点的IP地址和端口号不能和目标集群中节点的IP地址和端口号相同,否则会导致数据在Topic内无限循环复制。
- 使用MirrorMaker同步数据,至少需要有两个或以上集群,不可在单个集群内部使用MirrorMaker,否则会导致数据在Topic内无限循环复制。
实施步骤
- 购买一台弹性云服务器,确保弹性云服务器与源集群、目标集群网络互通。具体购买操作,请参考购买弹性云服务器。
- 登录弹性云服务器,安装Java JDK,并配置JAVA_HOME与PATH环境变量,使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
弹性云服务器默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
- 下载Kafka 3.3.1版本的二进制软件包。
wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
- 解压二进制软件包。
tar -zxvf kafka_2.12-3.3.1.tgz
- 进入二进制软件包目录,修改“config”目录下的“connect-mirror-maker.properties”的配置文件,在配置文件中指定源集群和目标集群的IP地址和端口以及其他配置。
# 指定两个集群 clusters = A, B A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:A_port B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port # 指定数据同步方向,可以单向同步也可互相同步 A->B.enabled = true # 指定同步的Topic,支持正则匹配,默认复制全部Topic,如:"foo-.*" A->B.topics = .* # 取消以下两个配置的注释表示A、B两个集群互相复制同步 #B->A.enabled = true #B->A.topics = .* # 设置副本个数,如果需要同步多个Topic且副本数各不相同,建议先创建同名同副本数的Topic再启动MirrorMaker replication.factor=3 # 设置消费进度同步方向,可以单向同步也可互相同步 A->B.sync.group.offsets.enabled=true ############################# Internal Topic Settings ############################# # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and # "mm2-offset-syncs.B.internal" # 测试环境可以为1,生产环境建议以下配置大于1,比如设为3 checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and # "mm2-status.B.internal" # 测试环境可以为1,生产环境建议以下配置大于1,比如设为3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 # customize as needed # replication.policy.separator = _ # sync.topic.acls.enabled = false # emit.heartbeats.interval.seconds = 5 # 设置目标集群中的Topic名称和源集群相同 # replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
- 在二进制软件包目录下,启动MirrorMaker,进行数据同步。
./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
- (可选)MirrorMaker开启后,如果在源集群上新建了Topic,如需对此Topic进行数据同步,则需重启MirrorMaker,重启步骤参考6。也可配置自动同步新增Topic,按需增加如表1所示配置后,无需重启MirrorMaker,即可周期性同步新增Topic。其中,“refresh.topics.interval.seconds”为必选,其他参数根据实际情况选择。
表1 MirrorMaker配置参数 参数名
默认值
说明
sync.topic.configs.enabled
true
是否监控源集群的配置更改
sync.topic.acls.enabled
true
是否监控源集群ACL的更改
emit.heartbeats.enabled
true
连接器应定期发出心跳
emit.heartbeats.interval.seconds
5秒
心跳频率
emit.checkpoints.enabled
true
连接器应定期发出消费端偏移量信息
emit.checkpoints.interval.seconds
5秒
检查点的频率
refresh.topics.enabled
true
连接器应定期检查新主题
refresh.topics.interval.seconds
5秒
检查源集群中是否有新主题的频率
refresh.groups.enabled
true
连接器应定期检查新的消费组
refresh.groups.interval.seconds
5秒
检查源集群新的消费组频率
replication.policy.class
org.apache.kafka.connect.mirror.DefaultReplicationPolicy
使用LegacyReplicationPolicy模仿旧版MirrorMaker
heartbeats.topic.retention.ms
1天
首次创建心跳主题时使用
checkpoints.topic.retention.ms
1天
首次创建检查点主题时使用
offset.syncs.topic.retention.ms
max long
首次创建偏移同步主题时使用
验证数据是否同步
- 在目标集群中查看Topic列表,确认是否有源集群Topic。
“replication.policy.class”的默认值为“org.apache.kafka.connect.mirror.DefaultReplicationPolicy”,此时目标集群中的Topic名称和源集群相比,多了前缀(如A.),这是MirrorMaker为了防止Topic循环备份进行的设置。如果想要Topic名称保持一致,请将“replication.policy.class”设置为“org.apache.kafka.connect.mirror.IdentityReplicationPolicy”。
- 在源集群生产并消费消息,在目标集群查看消费进度,确认数据是否已从源集群同步到了目标集群。
如果目标集群为华为云Kafka实例的话,在分布式消息服务Kafka版控制台的“消费组管理 > 消费进度”中,查看消费进度。