使用MirrorMaker Flow同步Kafka集群间数据
操作场景
MirrorMaker常用于数据迁移、跨地域同步等场景, 是Kafka跨集群复制的核心组件。
MRS集群安装Kafka组件的MirrorMaker实例,并创建Kafka MirrorMaker Flow,MirrorMaker Flow通过消费一个集群中的消息并将这些消息生产到另一个集群中,从而实现集群之间的数据同步。
约束与限制
- 本章节内容仅适用于MRS 3.6.0及之后版本。
- 创建MirrorMaker Flow任务对同步的Topic不存在校验操作,即当Topic不存在或者对端Topic已存在的场景则不校验。此特性可能产生如下影响:
- 如果源端的Topic不存在,那么MirrorMaker将无法从该Topic读取数据,导致数据同步失败。
- 如果对端(目标端)已经存在同名的Topic,而MirrorMaker没有进行校验,可能会导致重复同步、浪费资源、甚至可能覆盖目标端已有的数据。
- 在创建MirrorMaker Flow任务之前,可以手动检查源端和目标端的Topic是否存在。确保源端的Topic存在,目标端的Topic不存在或可以接受重复同步。也可编写脚本用于自动化检查源端和目标端的Topic状态。确保在创建MirrorMaker Flow任务之前,Topic的状态符合预期。
- 删除MirrorMaker Flow任务时,内置topic(mm2-offsets.c2.internal,mm2-offset-syncs.c2.internal,mm2-configs.c1.internal,mm2-offset-syncs.c1.internal,mm2-offsets.c1.internal,mm2-status.c1.internal)不会清除。如果数据累计,则内置Topic会占用磁盘空间。如果集群重启或有新的MM2实例启动,并且它们尝试连接到这些残留的、但不再活跃的内置Topic,可能会导致连接异常或状态不一致。建议在确认某个内置Topic已经不再被任何 MirrorMaker Flow任务使用后,可以手动删除内置Topic。
- 只有活跃的消费者组才能被同步。
前提条件
- 主集群与备集群认证模式保持一致。可以分别登录主集群、备集群Manager页面,选择“集群 > 集群属性”,查看“认证模式”。
- 配置数据同步的两个主备MRS集群的系统时间必须保持一致。可以执行date命令查看系统时间,两个集群之间时差需在10秒以内。
- 主集群和备集群不能存在主机名或者IP相同的节点。分别登录主集群、备集群Manager页面,选择“集群 > 主机”,查看所有IP信息,比较主集群和备集群是否存在相同的IP。分别登录主集群和备集群主OMS节点,查看“/etc/hosts”内容,比较是否存在hostname相同的信息。
- 主集群与备集群Kafka版本需要保持一致,且已安装MirrorMaker实例并正常运行。可以分别登录主集群、备集群的Manager,选择“集群 > 服务 > Kafka > 实例”,查看是否存在MirrorMaker实例并查看实例状态。
- 不同MRS Kafka集群之间需要配置Manager互信,详细步骤可参考配置MRS集群间互信章节。
- 已创建具有KafkaUI页面访问权限的用户,如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
- 主集群和备集群已开启REST服务:在Manager页面,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索并将参数“mm2.external.rest.server.enabled”选择为“true”后,重启MirrorMaker实例或Kafka服务使配置生效。
操作步骤
- 执行以下操作,在主集群创建Topic和数据。
- 以客户端安装用户,登录主集群安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
- 执行以下命令配置环境变量。
- 执行以下命令,进行用户认证。(普通模式跳过此步骤)
- 执行以下命令切换到Kafka客户端安装目录。
- 执行以下命令在主集群创建Topic和数据。
- Kafka集群IP:登录Manager页面,选择“集群 > 服务 > Kafka > 实例”。查看任意一个Broker角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
- Kafka集群端口号安全模式下是21007,普通模式下是9092。
创建topic,例如名称为“topic_test”:
sh ./kafka-topics.sh --create --topic topic_test --partitions 2 --replication-factor 2 --bootstrap-server Kafka集群IP:端口号 --command-config ../config/client.properties
产生数据:
sh ./kafka-producer-perf-test.sh --topic topic_test --num-records 100 --print-metrics --throughput 1 --record-size 100 --producer.config ../config/producer.properties

- 进入KafkaUI界面。
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
- 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
- 在“Topics”页面查看到topic创建成功,单击topic名称,查看到数据存在。

- 登录主集群节点,在主集群中进行数据迁移。例如执行以下“创建新MM2流连接器”请求示例。
“创建新MM2流连接器”请求示例如下:
POST https://Kafka集群IP:21017/mirror-flows -H "accept: application/json" -H "Content-Type: application/json" -d ' { "source": "c1", "target": "c2", "config": { "c1->c2.enabled": "true", "topics": "topic_test", "c1.bootstrap.servers": "192.168.xx.xx:21007,192.168.xx.xx:21007,192.168.xx.xx:21007", "c2.bootstrap.servers": "192.168.xx.xx:21007,192.168.xx.xx:21007,192.168.xx.xx:21007", "c1.security.protocol": "SASL_PLAINTEXT", "c2.security.protocol": "SASL_PLAINTEXT", "c1.kerberos.domain.name": "hadoop.e4f64a1b_7bf7_29aa_9821_c28fbd8142d5.com", "c2.kerberos.domain.name": "hadoop.ba0d7d39_d6bd_e0a2_68d7_ca23b4cbc68f.com", "c1.kafka.client.zookeeper.principal": "zookeeper/hadoop.e4f64a1b_7bf7_29aa_9821_c28fbd8142d5.com", "c2.kafka.client.zookeeper.principal": "zookeeper/hadoop.ba0d7d39_d6bd_e0a2_68d7_ca23b4cbc68f.com", "c1->c2.emit.heartbeats.enabled": "false", "c2->c1.emit.heartbeats.enabled": "false", "c1->c2.collect.offsets.enabled": "true", "c1->c2.collect.offsets.interval.seconds": "10", "c1->c2.sync.group.offsets.target.enabled": "true", "c1->c2.sync.group.offsets.enabled": "true", "c1->c2.sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "10", "refresh.topics.enabled": "true", "refresh.groups.enabled": "true", "sync.topic.acls.enabled": "true", "sync.topic.configs.enabled": "true", "c1.zookeeper.ssl.enable": "true", "c2.zookeeper.ssl.enable": "true", "c1.sasl.kerberos.service.name": "kafka", "c2.sasl.kerberos.service.name": "kafka", "replication.factor": "2" } }'相关参数含义如下:
表1 创建新MM2流连接器参数解释 参数
参数类型
是否必选
描述
source
String
是
源集群名称,可自定义,如:c1。
target
String
是
目标集群名称,可自定义,如:c2。
c1->c2.enabled
boolean
是
是否启用从c1到c2的同步(true表示启用)。默认为true。
c2.bootstrap.servers
String
是
c2的Kafka broker实例地址。
c1.bootstrap.servers
String
是
c1的Kafka broker实例地址。
c1.security.protocol
String
是
c1的安全协议。
c2.security.protocol
String
是
c2的安全协议。
c1.kerberos.domain.name
String
是
c1的Kerberos域名。
c2.kerberos.domain.name
String
是
c2的Kerberos域名。
topics
String
是
待同步的topic名称。
c1.kafka.client.zookeeper.principal
String
否
c1的ZooKeeper客户端Kerberos主体,如果ZooKeeper集群启用了SASL认证,并且选择的认证机制依赖于Kerberos,则需配置此参数,默认值为:zookeeper/hadoop.{c1集群域名}.com。
c2.kafka.client.zookeeper.principal
String
否
c2的ZooKeeper客户端Kerberos主体,如果ZooKeeper集群启用了SASL认证,并且选择的认证机制依赖于Kerberos,则需配置此参数,默认值为:zookeeper/hadoop.{c2集群域名}.com。
c1->c2.emit.heartbeats.enabled
boolean
否
是否发送心跳信号(false表示禁用),控制是否从MirrorMaker实例(c1->c2流的消费者/生产者)向目标集群(c2)发送心跳,以表明连接是否活跃。默认为false。
c2->c1.emit.heartbeats.enabled
boolean
否
反向心跳信号(false表示禁用),控制是否从目标集群(c2)向MirrorMaker实例发送心跳,以表明它是否可以接收数据。默认为false。
c1->c2.collect.offsets.enabled
boolean
否
是否收集消费者组的偏移量,控制是否收集从c1到c2的数据复制偏移量。偏移量用于跟踪复制进度。默认为false。
c1->c2.collect.offsets.interval.seconds
int
否
收集偏移量的间隔,默认值为120。
c1->c2.collect.offsets.use.offset.syncs.topic
boolean
否
是否指定从mm2-offset-syncs*主题中获取业务Topic分区的同步进度,默认从mm2- offset*主题中获取业务Topic分区的同步进度。
c1->c2.collect.offsets.fetch.latest.offset.info
boolean
否
是否指定从mm2-offset-syncs*或mm2-offsets*主题获取最新的消息,默认从最旧的位置消费消息。
c1->c2.sync.group.offsets.target.enabled
boolean
否
是否允许目标集群接收偏移量,控制是否将从c1收集到的消费者组偏移量信息发送给目标集群(c2)的MirrorMaker实例。通常与c1->c2.sync.group.offsets.enabled 配合使用,默认为false。
c1->c2.sync.group.offsets.enabled
boolean
否
是否同步消费者组偏移量到目标集群,控制是否从c1同步消费者组的偏移量到c2。启用此功能可以让c2上的消费者组状态与c1保持一致,默认为false。
c1->c2.sync.group.offsets.interval.seconds
int
否
同步偏移量的间隔,默认值为60。
emit.checkpoints.enabled
boolean
否
是否发送检查点(true,用于故障恢复),默认为true。
emit.checkpoints.interval.seconds
int
否
发送检查点的间隔,默认值为60。单位:秒。
refresh.topics.enabled
boolean
否
是否定期刷新主题列表,默认值为true。
refresh.groups.enabled
boolean
否
是否定期刷新消费者组列表,默认值为true。
sync.topic.acls.enabled
boolean
否
是否同步主题的ACL(访问控制列表),默认值为true。
sync.topic.configs.enabled
boolean
否
是否同步主题配置(如保留时间、分区数等),默认值为true。
c1.zookeeper.ssl.enable
boolean
否
c1的ZooKeeper是否启用SSL,默认值为false。
c2.zookeeper.ssl.enable
boolean
否
c2的ZooKeeper是否启用SSL,默认值为false。
c1.sasl.kerberos.service.name
String
否
c1的Kerberos服务名,默认为“kafka”。
c2.sasl.kerberos.service.name
String
否
c2的Kerberos服务名,默认为“kafka”。
replication.factor
int
否
主题(Topic)的副本数量,默认为2。
- Topic配置格式:支持单topic(如: topic_test )、多topic(如: topic01,topic02 )、正则表达式( 如: .* )。
- 如修改Topic配置,不需要重启flow即可生效。例如:Topic配置由"topics": "test01"修改为"topics": "test01,test02"后,不用重启flow,test01、test02均会自动同步数据。
- 分别登录主集群(c1)和备集群(c2)的Kafka UI,在“Topics”查看到topic同步到备集群成功。

数据同步成功:
