Updated on 2023-05-06 GMT+08:00

Managing Kafka Topics

Scenario

You can manage Kafka topics on a cluster client based on service requirements. Management permission is required for clusters with Kerberos authentication enabled.

Prerequisites

You have installed the Kafka client.

Procedure

  1. Access the ZooKeeper instance page.

    • For versions earlier than MRS 3.x, click the cluster name to go to the cluster details page and choose Components > ZooKeeper > Instances.

      If the Components tab is unavailable, complete IAM user synchronization first. (On the Dashboard page, click Synchronize on the right side of IAM User Sync to synchronize IAM users.)

    • For MRS 3.x or later, log in to FusionInsight Manager. For details, see Accessing FusionInsight Manager (MRS 3.x or Later). Choose Cluster > Name of the desired cluster > Services > ZooKeeper > Instance.

  2. View the IP addresses of the ZooKeeper role instance.

    Record any IP address of the ZooKeeper instance.

  3. Prepare the client based on service requirements. Log in to the node where the client is installed.

    Log in to the node where the client is installed. For details, see Using an MRS Client. .)

  4. Run the following command to switch to the client installation directory, for example, /opt/client/Kafka/kafka/bin.

    cd /opt/client/Kafka/kafka/bin

  5. Run the following command to configure environment variables:

    source /opt/client/bigdata_env

  6. Run the following command to perform user authentication (skip this step in normal mode):

    kinit Component service user

  7. For versions earlier than MRS 3.x, run the following commands to manage Kafka topics:

    • Creating a topic

      sh kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of replicas of the topic --zookeeper IP address of the node where the ZooKeeper instance resides:clientPort/kafka

    • Deleting a topic

      sh kafka-topics.sh --delete --topic Topic name --zookeeper IP address of the node where the ZooKeeper instance resides:clientPort/kafka

    • The number of topic partitions or topic backup replicas cannot exceed the number of Kafka instances.
    • By default, the value of clientPort of ZooKeeper is 2181.
    • There are three ZooKeeper instances. Use the IP address of any one.
    • For details about managing messages in Kafka topics, see Managing Messages in Kafka Topics.

  8. MRS 3.x and later versions: Use kafka-topics.sh to manage Kafka topics.

    • Creating a topic:

      ./kafka-topics.sh --create --topic topic name --partitions number of partitions occupied by the topic--replication-factor number of replicas of the topic--zookeeper IP address of any ZooKeeper node:clientPort/kafka

      ./kafka-topics.sh --create --topic topic name --partitions number of partitions occupied by the topic--replication-factor number of replicas of the topic --bootstrap-server IP address of the Kafkacluster:21007 --command-config ../config/client.properties

    • List of topics:
      • ./kafka-topics.sh --list --zookeeper service IP address of any ZooKeeper node:clientPort/kafka
      • ./kafka-topics.sh --list --bootstrap-server IP address of the Kafkacluster:21007 --command-config ../config/client.properties
    • Viewing the topic:
      • ./kafka-topics.sh --describe --zookeeper service IP address of any ZooKeeper node:clientPort/kafka --topic topic name
      • ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafkacluster:21007 --command-config ../config/client.properties --topic topic name
    • Modifying a topic:
      • ./kafka-topics.sh --alter --topic topic name--config configuration item=configuration value --zookeeper service IP address of any ZooKeeper node:clientPort/kafka
    • Expanding partitions:
      • ./kafka-topics.sh --alter --topic topic name --zookeeper service IP address of any ZooKeeper node:clientPort/kafka --command-config Kafka/kafka/config/client.properties --partitions number of partitions after the expansion
      • ./kafka-topics.sh --alter --topic topic name --bootstrap-server IP address of the Kafka cluster:21007 --command-config Kafka/kafka/config/client.properties --partitions number of partitions after the expansion
    • Deleting a topic
      • ./kafka-topics.sh --delete --topic topic name --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka
      • ./kafka-topics.sh --delete --topic topic name--bootstrap-server IP address of the Kafka cluster:21007 --command-config ../config/client.properties