Updated on 2024-10-08 GMT+08:00

Migrating Data on a Kafka Node

Scenario

You can run Kafka client commands to migrate data between partitions on a node without stopping services. You can also use the Kafka UI to migrate partitions.

Prerequisites

  • The MRS cluster administrator has understood service requirements and prepared a Kafka user (belonging to the kafkaadmin group. It is not required for the normal mode.).
  • The Kafka client has been installed.
  • The Kafka instance status and disk status are normal.
  • Based on the current disk space usage of the partition to be migrated, ensure that the disk space will be sufficient after the migration.

Migrating Data Using the Kafka Client

  1. Log in as a client installation user to the node on which the Kafka client is installed.
  2. Run the following command to switch to the Kafka client installation directory, for example, /opt/kafkaclient:

    cd /opt/kafkaclient

  3. Run the following command to set environment variables:

    source bigdata_env

  4. Run the following command to authenticate the user (skip this step in normal mode):

    kinit Component service user

  5. Run the following command to switch to the Kafka client directory:

    cd Kafka/kafka/bin

  6. Run the following command to view the topic details of the partition to be migrated:

    Security mode:

    ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafkacluster:21007 --command-config ../config/client.properties --topic topic name

    Normal mode:

    ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafka cluster:21005 --command-config ../config/client.properties --topic Topic name

  7. Run the following command to query the mapping between Broker_ID and the IP address:

    ./kafka-broker-info.sh --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka

    Broker_ID     IP_Address
    --------------------------
    4           192.168.0.100
    5           192.168.0.101
    6           192.168.0.102
    • IP address of the ZooKeeper quorumpeer instance

      To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.

    • Port number of the ZooKeeper client

      Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.

  8. Obtain the partition distribution and node information from the command output in 6 and 7, and create the JSON file for reallocation in the current directory.

    To migrate data in the partition whose Broker_ID is 6 to the /srv/BigData/hadoop/data1/kafka-logs directory, the required JSON configuration file is as follows:
    {"partitions":[{"topic": "testws","partition": 2,"replicas": [6,5],"log_dirs": ["/srv/BigData/hadoop/data1/kafka-logs","any"]}],"version":1}
    • topic indicates the topic name, for example, testws.
    • partition indicates the topic partition.
    • The number in replicas corresponds to Broker_ID. The value of replicas must be the same as the number of replicas of the partition. Otherwise, replicas may be missing. In this case, replicas where partitions are located correspond to 6 and 5. If only data in the partition of the node whose Broker_ID is 6 is migrated, the data in the partition of the node whose Broker_ID is 5 must also be migrated.
    • log_dirs indicates the path of the disk to be migrated. In this example, log_dirs of the node whose Broker_ID is 5 is set to any, and that of the node whose Broker_ID is 6 is set to /srv/BigData/hadoop/data1/kafka-logs. Note that the path must correspond to the node.

  9. Run the following command to perform reallocation:

    Security mode:

    ./kafka-reassign-partitions.sh --bootstrap-server Service IP address of Broker:21007 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file Path of the JSON file compiled in 8 --execute

    Normal mode:

    ./kafka-reassign-partitions.sh --bootstrap-server Service IP address of Broker:21005 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file Path of the JSON file compiled in 8 --execute

    If message "Successfully started reassignment of partitions" is displayed, the execution is successful.

Migrating Partitions Using the Kafka UI (MRS 3.1.2 or Later)

  1. Access the Kafka UI.

    1. Log in to FusionInsight Manager as a user who has the permission to access the Kafka UI and choose Cluster > Services > Kafka.

      If you need to perform operations on the page, for example, creating a topic, you need to grant related permissions to users. For details, see Kafka User Permissions.

    2. On the right of KafkaManager WebUI, click the URL to access the Kafka UI.

  2. Click Generate assignment. The Generate Partition Assignments page is displayed.
  3. In the Brokers area, select brokers to which the topic is to be re-assigned.
  4. Click Generate Partition Assignments to generate a partition migration solution.

  5. Click Run assignment to migrate a partition.