使用Kafka流控工具限制生产消费速度
操作场景
该任务指导用户根据业务需求,在客户端使用命令行工具kafka-configs.sh来管理Kafka集群的配置,按照不同的级别(topic级别、用户级别、客户端级别等)来限制Kafka消息生产和消费速度。
该功能仅适用于MRS 3.3.1及之后版本。
前提条件
- MRS集群管理员已明确业务需求。并准备一个Kafka组件业务用户,该用户属于kafkaadmin用户组。(普通模式不需要)
- 已安装Kafka客户端,例如客户端安装目录为“/opt/client”。
操作步骤
- 以客户端安装用户,登录已安装Kafka客户端的节点。
- 切换到Kafka客户端安装目录,例如“/opt/client”。
cd /opt/client
- 执行以下命令,配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证(普通模式跳过此步骤)。
kinit 组件业务用户
- 执行以下命令,切换到Kafka客户端安装目录。
cd Kafka/kafka
- 使用“kafka-configs.sh”进行Kafka流量控制,常用命令如下。
命令中使用到的部分参数值获取方式如下,具体以实际获取信息为准。
- ZooKeeper的任意一个节点的业务IP:登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 实例”,查看任意一个ZooKeeper角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
- clientPort:登录FusionInsight Manager页面,选择“集群 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索并记录“clientPort”参数值。例如获取到的端口为“24002”。
- Kafka集群IP:登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka > 实例”。查看任意一个Broker角色实例的业务IP地址。例如获取到的IP为“192.168.20.36”。
- Kafka集群端口号安全模式下是21007,普通模式下是9092。
- 客户端ID:可以在登录Kafka客户端后执行以下命令,查看返回结果中“CLIENT-ID”参数值进行获取。例如获取到的客户端ID为“clientA”。
bin/kafka-consumer-groups.sh --describe --bootstrap-server Kafka集群IP:端口号 --all-groups --command-config config/consumer.properties
- 基于topic级别的生产限流
bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type topics_limit --entity-name topic的名称
例如:
bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'producer_byte_rate=10485760' --entity-type topics_limit --entity-name testTopic-01
- 基于topic级别的消费限流
bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type topics_limit --entity-name topic的名称
例如:
bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'consumer_byte_rate=1048576' --entity-type topics_limit --entity-name testTopic-01
- 基于用户级别的生产限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name kafkauser --command-config config/client.properties
- 基于用户级别的消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --command-config config/client.properties
- 基于客户端级别的生产限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA --command-config config/client.properties
- 基于客户端级别的消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-name clientA --command-config config/client.properties
- 基于用户+客户端级别的生产和消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度,consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --entity-type clients --entity-name 客户端ID --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
- 查看限流信息
bin/kafka-configs.sh --describe --bootstrap-server Kafka集群IP:端口号 --entity-type myType --entity-name myName --command-config config/client.properties
- 其中“myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
- 其中“producer_byte_rate”和“consumer_byte_rate”分别为生产限流的速度和消费限流的速度,其单位均为字节/秒。
- 如果“myType”选择为“topics_limit”,则命令行需修改为:
bin/kafka-configs.sh --describe --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --entity-type topics_limit --entity-name topic的名称
例如:
bin/kafka-configs.sh --describe --bootstrap-server 192.168.20.36:21007 --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
- 取消限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type myType --entity-name myName --command-config config/client.properties
- 其中“myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
- 若“myType”选择为“topics_limit”,则命令行需修改为:
bin/kafka-configs.sh --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type topics_limit --entity-name topic的名称
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name kafkauser --entity-type clients --entity-name client --command-config config/client.properties