文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用Kafka/ Kafka运维管理/ 使用MirrorMaker Flow同步Kafka集群间数据
更新时间:2025-12-10 GMT+08:00
分享

使用MirrorMaker Flow同步Kafka集群间数据

操作场景

MirrorMaker常用于数据迁移、跨地域同步等场景, 是Kafka跨集群复制的核心组件。

MRS集群安装Kafka组件的MirrorMaker实例,并创建Kafka MirrorMaker Flow,MirrorMaker Flow通过消费一个集群中的消息并将这些消息生产到另一个集群中,从而实现集群之间的数据同步。

约束与限制

  • 本章节内容仅适用于MRS 3.6.0及之后版本。
  • 创建MirrorMaker Flow任务对同步的Topic不存在校验操作,即当Topic不存在或者对端Topic已存在的场景则不校验。此特性可能产生如下影响:
    • 如果源端的Topic不存在,那么MirrorMaker将无法从该Topic读取数据,导致数据同步失败。
    • 如果对端(目标端)已经存在同名的Topic,而MirrorMaker没有进行校验,可能会导致重复同步、浪费资源、甚至可能覆盖目标端已有的数据。
    • 在创建MirrorMaker Flow任务之前,可以手动检查源端和目标端的Topic是否存在。确保源端的Topic存在,目标端的Topic不存在或可以接受重复同步。也可编写脚本用于自动化检查源端和目标端的Topic状态。确保在创建MirrorMaker Flow任务之前,Topic的状态符合预期。
  • 删除MirrorMaker Flow任务时,内置topic(mm2-offsets.c2.internal,mm2-offset-syncs.c2.internal,mm2-configs.c1.internal,mm2-offset-syncs.c1.internal,mm2-offsets.c1.internal,mm2-status.c1.internal)不会清除。如果数据累计,则内置Topic会占用磁盘空间。如果集群重启或有新的MM2实例启动,并且它们尝试连接到这些残留的、但不再活跃的内置Topic,可能会导致连接异常或状态不一致。建议在确认某个内置Topic已经不再被任何 MirrorMaker Flow任务使用后,可以手动删除内置Topic。
  • 只有活跃的消费者组才能被同步。

前提条件

  • 主集群与备集群认证模式保持一致。可以分别登录主集群、备集群Manager页面,选择“集群 > 集群属性”,查看“认证模式”。
  • 配置数据同步的两个主备MRS集群的系统时间必须保持一致。可以执行date命令查看系统时间,两个集群之间时差需在10秒以内。
  • 主集群和备集群不能存在主机名或者IP相同的节点。分别登录主集群、备集群Manager页面,选择“集群 > 主机”,查看所有IP信息,比较主集群和备集群是否存在相同的IP。分别登录主集群和备集群主OMS节点,查看“/etc/hosts”内容,比较是否存在hostname相同的信息。
  • 主集群与备集群Kafka版本需要保持一致,且已安装MirrorMaker实例并正常运行。可以分别登录主集群、备集群的Manager,选择“集群 > 服务 > Kafka > 实例”,查看是否存在MirrorMaker实例并查看实例状态。
  • 不同MRS Kafka集群之间需要配置Manager互信,详细步骤可参考配置MRS集群间互信章节。
  • 已创建具有KafkaUI页面访问权限的用户,如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明
  • 主集群和备集群已开启REST服务:在Manager页面,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索并将参数“mm2.external.rest.server.enabled”选择为“true”后,重启MirrorMaker实例或Kafka服务使配置生效。

操作步骤

  1. 执行以下操作,在主集群创建Topic和数据。

    1. 以客户端安装用户,登录主集群安装客户端的节点。
    2. 执行以下命令,切换到客户端安装目录。

      cd 客户端安装路径

    3. 执行以下命令配置环境变量。

      source bigdata_env

    4. 执行以下命令,进行用户认证。(普通模式跳过此步骤)

      kinit 组件业务用户

    5. 执行以下命令切换到Kafka客户端安装目录。

      cd Kafka/kafka/bin

    6. 执行以下命令在主集群创建Topic和数据。
      • Kafka集群IP:登录Manager页面,选择“集群 > 服务 > Kafka > 实例”。查看任意一个Broker角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
      • Kafka集群端口号安全模式下是21007,普通模式下是9092。

      创建topic,例如名称为“topic_test”:

      sh ./kafka-topics.sh --create --topic topic_test --partitions 2 --replication-factor 2 --bootstrap-server Kafka集群IP:端口号 --command-config ../config/client.properties

      产生数据:

      sh ./kafka-producer-perf-test.sh --topic topic_test --num-records 100 --print-metrics --throughput 1 --record-size 100 --producer.config ../config/producer.properties

  2. 进入KafkaUI界面。

    1. 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。

      如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明

    2. 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。

  3. 在“Topics”页面查看到topic创建成功,单击topic名称,查看到数据存在。

  4. 登录主集群节点,在主集群中进行数据迁移。例如执行以下“创建新MM2流连接器”请求示例。

    “创建新MM2流连接器”请求示例如下:
    POST https://Kafka集群IP:21017/mirror-flows -H "accept: application/json" -H "Content-Type: application/json" -d '
    {
        "source": "c1",   
        "target": "c2",   
        "config": {
    	"c1->c2.enabled": "true", 
    	"topics": "topic_test",
    	"c1.bootstrap.servers": "192.168.xx.xx:21007,192.168.xx.xx:21007,192.168.xx.xx:21007",
    	"c2.bootstrap.servers": "192.168.xx.xx:21007,192.168.xx.xx:21007,192.168.xx.xx:21007",
    	"c1.security.protocol": "SASL_PLAINTEXT",
    	"c2.security.protocol": "SASL_PLAINTEXT",
    	"c1.kerberos.domain.name": "hadoop.e4f64a1b_7bf7_29aa_9821_c28fbd8142d5.com",
    	"c2.kerberos.domain.name": "hadoop.ba0d7d39_d6bd_e0a2_68d7_ca23b4cbc68f.com",
    	"c1.kafka.client.zookeeper.principal": "zookeeper/hadoop.e4f64a1b_7bf7_29aa_9821_c28fbd8142d5.com",
    	"c2.kafka.client.zookeeper.principal": "zookeeper/hadoop.ba0d7d39_d6bd_e0a2_68d7_ca23b4cbc68f.com",
            "c1->c2.emit.heartbeats.enabled": "false",
            "c2->c1.emit.heartbeats.enabled": "false",
            "c1->c2.collect.offsets.enabled": "true",
            "c1->c2.collect.offsets.interval.seconds": "10",
            "c1->c2.sync.group.offsets.target.enabled": "true",
            "c1->c2.sync.group.offsets.enabled": "true",
            "c1->c2.sync.group.offsets.interval.seconds": "5",
            "emit.checkpoints.enabled": "true",
            "emit.checkpoints.interval.seconds": "10",
            "refresh.topics.enabled": "true",
            "refresh.groups.enabled": "true",
            "sync.topic.acls.enabled": "true",
            "sync.topic.configs.enabled": "true",
            "c1.zookeeper.ssl.enable": "true",
            "c2.zookeeper.ssl.enable": "true",
            "c1.sasl.kerberos.service.name": "kafka",
            "c2.sasl.kerberos.service.name": "kafka",
            "replication.factor": "2"
        }
    }'

    相关参数含义如下:

    表1 创建新MM2流连接器参数解释

    参数

    参数类型

    是否必选

    描述

    source

    String

    源集群名称,可自定义,如:c1。

    target

    String

    目标集群名称,可自定义,如:c2。

    c1->c2.enabled

    boolean

    是否启用从c1到c2的同步(true表示启用)。默认为true。

    c2.bootstrap.servers

    String

    c2的Kafka broker实例地址。

    c1.bootstrap.servers

    String

    c1的Kafka broker实例地址。

    c1.security.protocol

    String

    c1的安全协议。

    c2.security.protocol

    String

    c2的安全协议。

    c1.kerberos.domain.name

    String

    c1的Kerberos域名。

    c2.kerberos.domain.name

    String

    c2的Kerberos域名。

    topics

    String

    待同步的topic名称。

    c1.kafka.client.zookeeper.principal

    String

    c1的ZooKeeper客户端Kerberos主体,如果ZooKeeper集群启用了SASL认证,并且选择的认证机制依赖于Kerberos,则需配置此参数,默认值为:zookeeper/hadoop.{c1集群域名}.com。

    c2.kafka.client.zookeeper.principal

    String

    c2的ZooKeeper客户端Kerberos主体,如果ZooKeeper集群启用了SASL认证,并且选择的认证机制依赖于Kerberos,则需配置此参数,默认值为:zookeeper/hadoop.{c2集群域名}.com。

    c1->c2.emit.heartbeats.enabled

    boolean

    是否发送心跳信号(false表示禁用),控制是否从MirrorMaker实例(c1->c2流的消费者/生产者)向目标集群(c2)发送心跳,以表明连接是否活跃。默认为false。

    c2->c1.emit.heartbeats.enabled

    boolean

    反向心跳信号(false表示禁用),控制是否从目标集群(c2)向MirrorMaker实例发送心跳,以表明它是否可以接收数据。默认为false。

    c1->c2.collect.offsets.enabled

    boolean

    是否收集消费者组的偏移量,控制是否收集从c1到c2的数据复制偏移量。偏移量用于跟踪复制进度。默认为false。

    c1->c2.collect.offsets.interval.seconds

    int

    收集偏移量的间隔,默认值为120。

    c1->c2.collect.offsets.use.offset.syncs.topic

    boolean

    是否指定从mm2-offset-syncs*主题中获取业务Topic分区的同步进度,默认从mm2- offset*主题中获取业务Topic分区的同步进度。

    c1->c2.collect.offsets.fetch.latest.offset.info

    boolean

    是否指定从mm2-offset-syncs*或mm2-offsets*主题获取最新的消息,默认从最旧的位置消费消息。

    c1->c2.sync.group.offsets.target.enabled

    boolean

    是否允许目标集群接收偏移量,控制是否将从c1收集到的消费者组偏移量信息发送给目标集群(c2)的MirrorMaker实例。通常与c1->c2.sync.group.offsets.enabled 配合使用,默认为false。

    c1->c2.sync.group.offsets.enabled

    boolean

    是否同步消费者组偏移量到目标集群,控制是否从c1同步消费者组的偏移量到c2。启用此功能可以让c2上的消费者组状态与c1保持一致,默认为false。

    c1->c2.sync.group.offsets.interval.seconds

    int

    同步偏移量的间隔,默认值为60。

    emit.checkpoints.enabled

    boolean

    是否发送检查点(true,用于故障恢复),默认为true。

    emit.checkpoints.interval.seconds

    int

    发送检查点的间隔,默认值为60。单位:秒。

    refresh.topics.enabled

    boolean

    是否定期刷新主题列表,默认值为true。

    refresh.groups.enabled

    boolean

    是否定期刷新消费者组列表,默认值为true。

    sync.topic.acls.enabled

    boolean

    是否同步主题的ACL(访问控制列表),默认值为true。

    sync.topic.configs.enabled

    boolean

    是否同步主题配置(如保留时间、分区数等),默认值为true。

    c1.zookeeper.ssl.enable

    boolean

    c1的ZooKeeper是否启用SSL,默认值为false。

    c2.zookeeper.ssl.enable

    boolean

    c2的ZooKeeper是否启用SSL,默认值为false。

    c1.sasl.kerberos.service.name

    String

    c1的Kerberos服务名,默认为“kafka”。

    c2.sasl.kerberos.service.name

    String

    c2的Kerberos服务名,默认为“kafka”。

    replication.factor

    int

    主题(Topic)的副本数量,默认为2。

    • Topic配置格式:支持单topic(如: topic_test )、多topic(如: topic01,topic02 )、正则表达式( 如: .* )。
    • 如修改Topic配置,不需要重启flow即可生效。例如:Topic配置由"topics": "test01"修改为"topics": "test01,test02"后,不用重启flow,test01、test02均会自动同步数据。

  5. 分别登录主集群(c1)和备集群(c2)的Kafka UI,在“Topics”查看到topic同步到备集群成功。

    数据同步成功:

相关文档