更新时间:2024-11-26 GMT+08:00
分享

迁移Kafka节点内数据

操作场景

用户可以根据业务需求,通过Kafka客户端命令,在不停止服务的情况下,进行节点内磁盘间的分区数据迁移。也可以通过KafkaUI进行分区迁移。

前提条件

  • MRS集群管理员已明确业务需求,并准备一个Kafka用户(属于kafkaadmin组,普通模式不需要)。
  • 已安装Kafka客户端。
  • Kafka实例状态和磁盘状态均正常。
  • 根据待迁移分区当前的磁盘空间占用情况,评估迁移后,不会导致新迁移后的磁盘空间不足。

使用Kafka客户端迁移数据

  1. 以客户端安装用户,登录已安装Kafka客户端的节点。
  2. 执行以下命令,切换到Kafka客户端安装目录,例如“/opt/kafkaclient”。

    cd /opt/kafkaclient

  3. 执行以下命令,配置环境变量。

    source bigdata_env

  4. 执行以下命令,进行用户认证(普通模式跳过此步骤)。

    kinit 组件业务用户

  5. 执行以下命令,切换到Kafka客户端目录。

    cd Kafka/kafka/bin

  6. 执行以下命令,查看待迁移的Partition对应的Topic的详细信息。

    安全模式:

    ./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:21007 --command-config ../config/client.properties --topic 主题名称

    普通模式:

    ./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:21005 --command-config ../config/client.properties --topic 主题名称

  7. 执行以下命令,查询Broker_ID和IP对应关系。

    ./kafka-broker-info.sh --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    Broker_ID     IP_Address
    --------------------------
    4           192.168.0.100
    5           192.168.0.101
    6           192.168.0.102
    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。默认为24002。

  8. 67回显中获取分区的分布信息和节点信息,在当前目录下创建执行重新分配的json文件。

    以迁移的是Broker_ID为6的节点的分区为例,迁移到"/srv/BigData/hadoop/data1/kafka-logs”,完成迁移所需的json配置文件,内容如下。
    {"partitions":[{"topic": "testws","partition": 2,"replicas": [6,5],"log_dirs": ["/srv/BigData/hadoop/data1/kafka-logs","any"]}],"version":1}
    • topic为Topic名称,此处以testws为例,具体以实际为准。
    • partition为Topic分区。
    • replicas中的数字对应Broker_ID。replicas必须与分区的副本数相对应,不然会造成副本缺少的情况。在本案例中分区所在的replicas对应6和5,只迁移Broker_ID为6的节点的分区中的数据时,也必须把Broker_ID为5的节点的分区带上。
    • log_dirs为需要迁移的磁盘路径。此样例迁移的是Broker_ID为6的节点,Broker_ID为5的节点对应的log_dirs可设置为“any”,Broker_ID为6的节点对应的log_dirs设置为“/srv/BigData/hadoop/data1/kafka-logs”。注意路径需与节点对应

  9. 使用如下命令,执行重分配操作。

    安全模式:

    ./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:21007 --command-config ../config/client.properties --reassignment-json-file 8中编写的json文件路径 --execute

    普通模式:

    ./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:21005 --command-config ../config/client.properties --reassignment-json-file 8中编写的json文件路径 --execute

    提示“Successfully started reassignment of partitions”表示执行成功。

使用KafkaUI迁移分区(MRS 3.5.0之前版本)

  1. 进入KafkaUI界面。

    1. 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。

      如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明

    2. 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。

  2. 单击“Generate assignment”进入分区迁移页面。
  3. 在“Brokers”处选择要将主题重新分配的Broker ID
  4. 单击“Generate Partition Assignments”生成分区迁移方案。

  5. 继续单击“Run assignment”执行分区迁移方案,完成分区迁移。

使用KafkaUI迁移分区(MRS 3.5.0及之后版本)

  1. 进入KafkaUI界面。

    1. 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。

      如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明

    2. 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。

  2. 单击“Generate assignment”进入分区迁移页面。
  3. 在“Brokers”处选择要将主题重新分配的Broker ID。
  4. 在搜索栏中搜索需要进行分区迁移的Topic以及分区。

    • Partition展示待迁移的Topic,其命名规则为Topic名称-分区索引
    • Replicas展示当前该Topic分区所在Broker的ID。

  5. 单击“Generate Partition Assignments”按钮,生成分区迁移计划。

    • 可以在“Throttle”配置分区迁移的限流参数,不配置则使用默认值,默认值为“0”,表示不限速。
    • Target Replicas:为自动生成的分区迁移计划,支持手动修改,其规则为“BrokerId1,BrokerId2”。
    • Operation:可以对不需要进行分区迁移的Topic进行删除。

  6. 单击“Run assignment”执行分区迁移计划。
  7. 单击“Current Reassign Status”可以查看当前分区迁移计划的执行情况。

    • 单击“Modify Reassignment Throttle”可以修改分区迁移的限流参数。
    • 单击“Cancel”可以取消当前所有正在执行的分区迁移计划。

相关文档