Updated on 2024-10-24 GMT+08:00

Using the Kafka Client

Scenario

This section guides users to use a Kafka client in an O&M or service scenario.

This section applies to MRS 3.x or later.

Prerequisites

  • The cluster client has been installed in a directory, for example, /opt/client.
  • Service users of each component are created by the MRS cluster administrator based on service requirements. A machine-machine user needs to download the keytab file and a human-machine user needs to change the password upon the first login. (Not involved in normal mode)
  • After changing the domain name of a cluster, redownload the client to ensure that the kerberos.domain.name value in the configuration file of the client is set to the correct server domain name.

Using a Kafka Client

  1. Log in to the node where the client is installed as the client installation user.
  2. Run the following command to go to the client installation directory:

    cd /opt/client

  3. Run the following command to configure environment variables:

    source bigdata_env

  4. Run the following command to perform user authentication (skip this step in normal mode):

    kinit Component service user

  5. Run the following command to switch to the Kafka client installation directory:

    cd Kafka/kafka/bin

  6. Run the following command to use the client tool to view and use the help information:

    • ./kafka-console-consumer.sh: Kafka message reading tool
    • ./kafka-console-producer.sh: Kafka message publishing tool
    • ./kafka-topics.sh: Kafka topic management tool

  7. To perform various operations on Kafka topics for versions prior to MRS 3.x, execute the appropriate commands.

    • Command for creating a topic:

      sh kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of replicas of the topic --zookeeper IP address of the node where the ZooKeeper instance resides:clientPort/kafka

    • Command for deleting a topic:

      sh kafka-topics.sh --delete --topic Topic name --zookeeper IP address of the node where the ZooKeeper instance resides:clientPort/kafka

    • The number of topic partitions or topic backup copies cannot exceed the number of Kafka instances.
    • By default, the ZooKeeper's clientPort value is 2181.
    • There are three ZooKeeper instances. Use the IP address of any one.

  8. MRS 3.x and later versions: Use kafka-topics.sh to manage Kafka topics.

    • Creating a topic:

      By default, partitions of a topic are distributed based on the number of partitions on the node and disk. To distribute partitions based on the disk capacity, set log.partition.strategy to capacity for the Kafka service.

      When a topic is created in Kafka, partitions and copies can be generated based on the combination of rack awareness and cross-AZ feature. The --zookeeper and --bootstrap-server modes are supported.

      • Disable the rack policy and cross-AZ feature (default policy).

        Copies of topics created based on this policy are randomly allocated to any node in the cluster.

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic--replication-factor Number of backups of the topic --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --bootstrap-server Kafka Cluster IP address:21007 --command-config ../config/client.properties

        If you use --bootstrap-server to create a topic, set rack.aware.enable and az.aware.enable to false.

      • Enable the rack policy and disable the cross-AZ feature.

        To balance the load distribution, this policy randomly assigns the leader of each partition for a topic to a cluster node, while assigning different replicas of the same partition to different racks. It is important to ensure an equal number of nodes in each rack to avoid overloading the rack with fewer nodes.

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka --enable-rack-aware

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --bootstrap-server Kafka Cluster IP address:21007 --command-config ../config/client.properties

        If you use --bootstrap-server to create a topic, set rack.aware.enable to true and az.aware.enable to false.

      • Disable the rack policy and enable the cross-AZ feature.

        To balance the load distribution, this policy randomly assigns the leader of each partition for a topic to a cluster node, while assigning different replicas of the same partition to different AZs. It is important to ensure an equal number of nodes in each AZ to avoid overloading the AZ with fewer nodes.

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka --enable-az-aware

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --bootstrap-server Kafka Cluster IP address:21007 --command-config ../config/client.properties

        If you use --bootstrap-server to create a topic, set rack.aware.enable to false and az.aware.enable to true.

      • Enable the rack policy and cross-AZ feature.

        This policy randomly assigns the leader of each partition for a topic to a cluster node, while assigning different replicas of the same partition to different racks in different AZs. It is important to ensure each rack in each AZ has an equal number of nodes to prevent load imbalance in the cluster.

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka --enable-rack-aware --enable-az-aware

        ./kafka-topics.sh --create --topic Topic name --partitions Number of partitions occupied by the topic --replication-factor Number of backups of the topic --bootstrap-server Kafka Cluster IP address:21007 --command-config ../config/client.properties

        If you use --bootstrap-server to create a topic, set rack.aware.enable and az.aware.enable to true.

      • Kafka supports topic creation in either of the following modes:
        • In --zookeeper mode, the client creates a copy allocation solution. To reduce dependence on the ZooKeeper component, the community will no longer support this mode in future versions, despite having supported it from the beginning. When creating a topic in this mode, you can select a copy allocation policy by combining the --enable-rack-aware and --enable-az-aware options. Note that you can only use the --enable-az-aware option if the cross-AZ feature is enabled on the server, which requires az.aware.enable to be set to true. If this feature is not enabled, the execution will fail.
        • In --bootstrap-server mode, the server creates a copy allocation solution. Going forward, the community will only support this mode for managing topics. When a topic is created in this mode, the --enable-rack-aware and --enable-az-aware options cannot be used to control the copy allocation policy. The rack.aware.enable and az.aware.enable parameters can be used together to control the copy allocation policy. Note that the az.aware.enable parameter is unmodifiable. If the cross-AZ feature is enabled when creating a cluster, this parameter will automatically be set to true. However, the rack.aware.enable parameter can be customized.
    • Listing topics:
      • ./kafka-topics.sh --list --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka
      • ./kafka-topics.sh --list --bootstrap-server IP address of the Kafka cluster:21007 --command-config ../config/client.properties
    • Viewing a topic:
      • ./kafka-topics.sh --describe --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka --topic Topic name
      • ./kafka-topics.sh --describe --bootstrap-server IP address of the Kafka cluster:21007 --command-config ../config/client.properties --topic Topic name
    • Modifying a topic:
      • ./kafka-topics.sh --alter --topic Topic name --config Configuration item=Value--zookeeper Service IP address of any ZooKeeper node:clientPort/kafka
    • Expanding partitions:
      • ./kafka-topics.sh --alter --topic Topic name --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka --command-config Kafka/kafka/config/client.properties --partitions Number of partitions after the expansion
      • ./kafka-topics.sh --alter --topic Topic name --bootstrap-server IP address of the Kafka cluster:21007 --command-config Kafka/kafka/config/client.properties --partitions Number of partitions after the expansion
    • Deleting a topic:
      • ./kafka-topics.sh --delete --topic Topic name --zookeeper Service IP address of any ZooKeeper node:clientPort/kafka
      • ./kafka-topics.sh --delete --topic Topic name --bootstrap-server IP address of the Kafka cluster:21007 --command-config ../config/client.properties