更新时间:2024-11-26 GMT+08:00
分享

使用Kafka流控工具限制生产消费速度

操作场景

该任务指导用户根据业务需求,在客户端使用命令行工具kafka-configs.sh来管理Kafka集群的配置,按照不同的级别(topic级别、用户级别、客户端级别等)来限制Kafka消息生产和消费速度。

该功能仅适用于MRS 3.3.1及之后版本。

前提条件

  • MRS集群管理员已明确业务需求。并准备一个Kafka组件业务用户,该用户属于kafkaadmin用户组。(普通模式不需要)
  • 已安装Kafka客户端,例如客户端安装目录为“/opt/client”。

操作步骤

  1. 以客户端安装用户,登录已安装Kafka客户端的节点。
  2. 切换到Kafka客户端安装目录,例如“/opt/client”。

    cd /opt/client

  3. 执行以下命令,配置环境变量。

    source bigdata_env

  4. 执行以下命令,进行用户认证(普通模式跳过此步骤)。

    kinit 组件业务用户

  5. 执行以下命令,切换到Kafka客户端安装目录。

    cd Kafka/kafka

  6. 使用“kafka-configs.sh”进行Kafka流量控制,常用命令如下。

    命令中使用到的部分参数值获取方式如下,具体以实际获取信息为准。

    • ZooKeeper的任意一个节点的业务IP:登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看任意一个ZooKeeper角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
    • clientPort:登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索并记录“clientPort”参数值。例如获取到的端口为“24002”。
    • Kafka集群IP:登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka > 实例”。查看任意一个Broker角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
    • Kafka集群端口号安全模式下是21007,普通模式下是9092。
    • 客户端ID:可以在登录Kafka客户端后执行以下命令,查看返回结果中“CLIENT-ID”参数值进行获取。例如获取到的客户端ID为“clientA”。

      bin/kafka-consumer-groups.sh --describe --bootstrap-server Kafka集群IP:端口号 --all-groups --command-config config/consumer.properties

    • 基于topic级别的生产限流

      bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type topics_limit --entity-name topic的名称

      例如:

      bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'producer_byte_rate=10485760' --entity-type topics_limit --entity-name testTopic-01

    • 基于topic级别的消费限流

      bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type topics_limit --entity-name topic的名称

      例如:

      bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'consumer_byte_rate=1048576' --entity-type topics_limit --entity-name testTopic-01

    • 基于用户级别的生产限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name kafkauser --command-config config/client.properties

    • 基于用户级别的消费限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --command-config config/client.properties

    • 基于客户端级别的生产限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA --command-config config/client.properties

    • 基于客户端级别的消费限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-name clientA --command-config config/client.properties

    • 基于用户+客户端级别的生产和消费限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度,consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --entity-type clients --entity-name 客户端ID --command-config config/client.properties

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties

    • 查看限流信息

      bin/kafka-configs.sh --describe --bootstrap-server Kafka集群IP:端口号 --entity-type myType --entity-name myName --command-config config/client.properties

      • 其中“myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
      • 其中“producer_byte_rate”和“consumer_byte_rate”分别为生产限流的速度和消费限流的速度,其单位均为字节/秒。
      • 如果“myType”选择为“topics_limit”,则命令行需修改为:

        bin/kafka-configs.sh --describe --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --entity-type topics_limit --entity-name topic的名称

      例如:

      bin/kafka-configs.sh --describe --bootstrap-server 192.168.20.36:21007 --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties

    • 取消限流

      bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type myType --entity-name myName --command-config config/client.properties

      • 其中“myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
      • 若“myType”选择为“topics_limit”,则命令行需修改为:

        bin/kafka-configs.sh --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type topics_limit --entity-name topic的名称

      例如:

      bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name kafkauser --entity-type clients --entity-name client --command-config config/client.properties

相关文档