更新时间:2025-12-26 GMT+08:00
分享

使用Kafka流控工具限制生产消费速度

操作场景

在Kafka集群实际运行中,不同业务场景(如突发流量高峰、资源隔离需求等)可能需要对消息生产和消费速度进行差异化控制该任务指导用户根据业务需求,在客户端使用命令行工具kafka-configs.sh来管理Kafka集群的配置,按照不同的级别(topic级别、用户级别、客户端级别等)来限制Kafka消息生产和消费速度。

约束与限制

该功能仅适用于MRS 3.3.1及之后版本。

前提条件

  • MRS集群管理员已明确业务需求。并准备一个Kafka组件业务用户,该用户属于kafkaadmin用户组。(集群未启用Kerberos认证(普通模式)不需要)

    创建用户相关操作请参考创建Kafka用户并绑定角色

  • 已安装客户端,例如安装目录为“/opt/client,以下操作的客户端目录只是举例,请根据实际安装目录修改

    下载并安装集群客户端的具体操作,请参考安装MRS集群客户端

操作步骤

  1. 登录MRS集群Manager。

    登录集群Manager具体操作,请参考访问MRS集群Manager

  2. 获取Kafka节点业务IP及端口

    1. 选择“集群 > 服务 > Kafka > 实例”,查看并记录任意一个Broker角色实例的业务IP地址。
    2. Kafka集群端口号:

      集群已启用Kerberos认证(安全模式):默认为21007

      集群未启用Kerberos认证(普通模式):默认为9092

  3. 获取ZooKeeper节点业务IP及端口。

    1. 选择“集群 > 服务 > ZooKeeper > 实例”,查看并记录任意一个ZooKeeper角色实例的业务IP地址。
    2. 选择“配置 > 全部配置”,搜索“clientPort”,查看并记录端口号。

      默认端口如下:

      • 开源端口默认值为:2181
      • 定制端口默认值为:24002

      端口定制/开源区分:创建LTS版本类型集群时,可以选择“组件端口”为“开源”或是“定制”,选择“开源”使用开源端口,选择“定制”使用定制端口。

  4. 以客户端安装用户,登录安装客户端的节点。
  5. 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client,具体以实际替换

    cd /opt/client

  6. 执行以下命令配置环境变量。

    source bigdata_env

  7. 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)跳过此步骤)

    kinit 组件业务用户

  8. 执行以下命令,切换到客户端安装“kafka”目录。

    cd Kafka/kafka

  9. 使用“kafka-configs.sh”进行Kafka流量控制,常用命令如下。

    • 基于Topic级别的生产限流
      bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type topics_limit --entity-name Topic的名称

      例如,执行以下命令,限制名称为testTopic-01的主题的生产者字节速率为每秒10MB(10485760字节),用于控制该主题的生产者流量

      bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'producer_byte_rate=10485760' --entity-type topics_limit --entity-name testTopic-01
    • 基于Topic级别的消费限流
      bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type topics_limit --entity-name Topic的名称

      例如,执行以下命令,限制名称为testTopic-01的主题的消费者字节速率为每秒1MB(1048576字节),用于控制该主题的消费者流量

      bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'consumer_byte_rate=1048576' --entity-type topics_limit --entity-name testTopic-01
    • 基于用户级别的生产限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties

      例如,执行以下命令,限制名称为kafkauser的用户的生产者字节速率为每秒10MB(10485760字节),用于控制该用户下所有生产者的流量

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name kafkauser --command-config config/client.properties
    • 基于用户级别的消费限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties

      例如,执行以下命令,限制名称为kafkauser的用户的消费者字节速率为每秒1MB(1048576字节),用于控制该主题的消费者流量

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --command-config config/client.properties
    • 基于客户端级别的生产限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      其中,客户端ID可以在登录Kafka客户端后执行以下命令,查看返回结果中“CLIENT-ID”参数值进行获取。例如获取到的客户端ID为“clientA”。

      bin/kafka-consumer-groups.sh --describe --bootstrap-server Kafka集群IP:端口号 --all-groups --command-config config/consumer.properties

      例如限制ID名称为clientAKafka客户端的生产者字节速率为每秒10MB(10485760字节),用于控制该客户端ID下所有生产者的流量

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA --command-config config/client.properties
    • 基于客户端级别的消费限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      例如,执行以下命令,限制ID名称为clientAKafka客户端的消费者字节速率为每秒1MB(1048576字节),用于控制该客户端ID的消费者流量

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-name clientA --command-config config/client.properties
    • 基于用户+客户端级别的生产和消费限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度,consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
    • 查看限流信息
      bin/kafka-configs.sh --describe --bootstrap-server Kafka集群IP:端口号 --entity-type myType --entity-name myName --command-config config/client.properties

      其中,

      • “myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
      • 如果“myType”选择为“topics_limit”,则命令行需修改为:
        bin/kafka-configs.sh --describe --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --entity-type topics_limit --entity-name Topic的名称
      • “producer_byte_rate”和“consumer_byte_rate”分别为生产限流的速度和消费限流的速度,其单位均为字节/秒。

      例如:

      bin/kafka-configs.sh --describe --bootstrap-server 192.168.20.36:21007 --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
    • 取消限流
      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type myType --entity-name myName --command-config config/client.properties

      其中,

      • “myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
      • 若“myType”选择为“topics_limit”,则命令行需修改为:
        bin/kafka-configs.sh --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type topics_limit --entity-name Topic的名称

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name kafkauser --entity-type clients --entity-name client --command-config config/client.properties

相关文档