更新时间:2024-07-24 GMT+08:00

均衡Kafka扩容节点后数据

操作场景

用户可以在Kafka扩容节点后,在客户端中执行Kafka均衡工具来均衡Kafka集群的负载。

本章节内容适用于MRS 3.x之前版本。3.x及之后版本请参考配置Kafka数据均衡工具

前提条件

  • MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组,普通模式不需要)。
  • 已安装Kafka客户端,客户端安装目录如“/opt/client”。
  • 本示例需创建两个Topic,可参考7,分别命名为“test_2”和“test_3”,并创建“move-kafka-topic.json”文件,创建路径如“/opt/client/Kafka/kafka”,Topic格式内容如下:
    {
    "topics":
    [{"topic":"test_2"},{"topic":"test_3"}],
    "version":1
    }

操作步骤

  1. 以客户端安装用户,登录安装Kafka客户端的节点。
  2. 切换到Kafka客户端安装目录。

    cd /opt/client

  3. 执行以下命令,配置环境变量。

    source bigdata_env

  4. 执行以下命令,进行用户认证。(普通模式跳过此步骤)

    kinit 组件业务用户

  5. 执行以下命令进入Kafka客户端的bin目录。

    cd Kafka/kafka/bin

  6. 执行以下命令生成执行计划。

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --topics-to-move-json-file ../move-kafka-topic.json --broker-list "1,2,3" --generate

    • 172.16.0.119:ZooKeeper实例的业务IP。
    • --broker-list "1,2,3":参数中的“1,2,3”为扩容后的所有broker_id。

  7. 执行vim ../reassignment.json创建“reassignment.json”文件并保存,保存路径为“/opt/kafkaclient/Kafka/kafka”。

    拷贝6中生成的“Proposed partition reassignment configuration”下的内容至“reassignment.json”文件,如下所示:
    {"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}

  8. 执行以下命令进行分区重分布。

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --execute --throttle 50000000

    --throttle 50000000:限制网络带宽为50MB。带宽可根据数据量大小及客户对均衡时间的要求进行调整,5TB数据量,使用50MB带宽,均衡时长约8小时。

  9. 执行以下命令查看迁移状态。

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --verify