Help Center> MapReduce Service> Component Operation Guide (Normal)> Using Kafka> Balancing Data After Kafka Node Scale-Out
Updated on 2022-12-02 GMT+08:00

Balancing Data After Kafka Node Scale-Out

Scenario

This section describes how to use the Kafka balancing tool on the client to balance the load of the Kafka cluster after Kafka nodes are scaled out.

This section applies to versions earlier than MRS 3.x. For MRS 3.x or later, see Kafka Balancing Tool Instructions.

Prerequisites

  • The MRS cluster administrator has understood service requirements and prepared a Kafka administrator (belonging to the kafkaadmin group. It is not required for the normal mode.).
  • The Kafka client has been installed in a directory, for example, /opt/client.
  • Two topics named test_2 and test_3 has been created by referring to 7. The move-kafka-topic.json file has been created in the /opt/client/Kafka/kafka directory. The topic format is as follows:
    {
    "topics":
    [{"topic":"test_2"},{"topic":"test_3"}],
    "version":1
    }

Procedure

  1. Log in to the node where the Kafka client is installed as the client installation user.
  2. Run the following command to switch to the client installation directory:

    cd /opt/client

  3. Run the following command to set environment variables:

    source bigdata_env

  4. Run the following command to perform user authentication (skip this step if the cluster is in normal mode):

    kinit Component service user

  5. Run the following command to go to the bin directory of the Kafka client:

    cd Kafka/kafka/bin

  6. Run the following command to generate an execution plan:

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --topics-to-move-json-file ../move-kafka-topic.json --broker-list "1,2,3" --generate

    • 172.16.0.119: service IP address of the ZooKeeper instance
    • --broker-list "1,2,3": list of broker instances. 1,2,3 indicates all broker IDs after a scale-out.

  7. Run the vim ../reassignment.json command to create the reassignment.json file and save it to the /opt/kafkaclient/Kafka/kafka directory.

    Copy the content under Proposed partition reassignment configuration generated in 6 to the reassignment.json file, as shown in the follows:
    {"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}

  8. Run the following command to redistribute partitions:

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --execute --throttle 50000000

    --throttle 50000000: The maximum bandwidth is 50 MB/s. You can change the bandwidth based on the data volume and the customer's requirements on the balancing time. If the data volume is 5 TB, the bandwidth is 50 MB/s and the data balancing takes about 8 hours.

  9. Run the following command to check the data migration status:

    ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --verify