Updated on 2023-05-06 GMT+08:00

Migrating Data Between Kafka Nodes

Scenario

This section describes how to use Kafka client commands to migrate partition data between disks on a node without stopping the Kafka service.

Prerequisites

  • The MRS cluster administrator has understood service requirements and prepared a Kafka user (belonging to the kafkaadmin group. It is not required for the normal mode.).
  • The Kafka client has been installed.
  • The Kafka instance status and disk status are normal.
  • Based on the current disk space usage of the partition to be migrated, ensure that the disk space will be sufficient after the migration.

Procedure

  1. Log in as a client installation user to the node on which the Kafka client is installed.
  2. Run the following command to switch to the Kafka client installation directory, for example, /opt/kafkaclient:

    cd /opt/kafkaclient

  3. Run the following command to set environment variables:

    source bigdata_env

  4. Run the following command to authenticate the user (skip this step in normal mode):

    kinit Component service user

  5. Run the following command to switch to the Kafka client directory:

    cd Kafka/kafka/bin

  6. Run the following command to view the topic details of the partition to be migrated:

    Security mode:

    ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafkacluster:21007 --command-config ../config/client.properties --topic topic name

    Normal mode:

    ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafka cluster:21005 --command-config ../config/client.properties --topic Topic name

  7. Run the following command to query the mapping between Broker_ID and the IP address:

    ./kafka-broker-info.sh --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka

    Broker_ID     IP_Address
    --------------------------
    4           192.168.0.100
    5           192.168.0.101
    6           192.168.0.102
    • IP address of the ZooKeeper quorumpeer instance

      To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.

    • Port number of the ZooKeeper client

      Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.

  8. Obtain the partition distribution and node information from the command output in 6 and 7, and create the JSON file for reallocation in the current directory.

    To migrate data in the partition whose Broker_ID is 6 to the /srv/BigData/hadoop/data1/kafka-logs directory, the required JSON configuration file is as follows:
    {"partitions":[{"topic": "testws","partition": 2,"replicas": [6,5],"log_dirs": ["/srv/BigData/hadoop/data1/kafka-logs","any"]}],"version":1}
    • topic indicates the topic name, for example, testws.
    • partition indicates the topic partition.
    • The number in replicas corresponds to Broker_ID. The value of replicas must be the same as the number of replicas of the partition. Otherwise, replicas may be missing. In this case, replicas where partitions are located correspond to 6 and 5. If only data in the partition of the node whose Broker_ID is 6 is migrated, the data in the partition of the node whose Broker_ID is 5 must also be migrated.
    • log_dirs indicates the path of the disk to be migrated. In this example, log_dirs of the node whose Broker_ID is 5 is set to any, and that of the node whose Broker_ID is 6 is set to /srv/BigData/hadoop/data1/kafka-logs. Note that the path must correspond to the node.

  9. Run the following command to perform reallocation:

    Security mode:

    ./kafka-reassign-partitions.sh --bootstrap-server Service IP address of Broker:21007 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file Path of the JSON file compiled in 8 --execute

    Normal mode:

    ./kafka-reassign-partitions.sh --bootstrap-server Service IP address of Broker:21005 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file Path of the JSON file compiled in 8 --execute

    If message "Successfully started reassignment of partitions" is displayed, the execution is successful.