使用Kafka流控工具限制生产消费速度
操作场景
在Kafka集群实际运行中,不同业务场景(如突发流量高峰、资源隔离需求等)可能需要对消息生产和消费速度进行差异化控制。该任务指导用户根据业务需求,在客户端使用命令行工具kafka-configs.sh来管理Kafka集群的配置,按照不同的级别(topic级别、用户级别、客户端级别等)来限制Kafka消息生产和消费速度。
约束与限制
该功能仅适用于MRS 3.3.1及之后版本。
前提条件
- MRS集群管理员已明确业务需求。并准备一个Kafka组件业务用户,该用户属于kafkaadmin用户组。(集群未启用Kerberos认证(普通模式)不需要)
创建用户相关操作请参考创建Kafka用户并绑定角色。
- 已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
下载并安装集群客户端的具体操作,请参考安装MRS集群客户端。
操作步骤
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取Kafka节点业务IP及端口。
- 获取ZooKeeper节点业务IP及端口。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令,切换到客户端安装“kafka”目录。
cd Kafka/kafka
- 使用“kafka-configs.sh”进行Kafka流量控制,常用命令如下。
- 基于Topic级别的生产限流
bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type topics_limit --entity-name Topic的名称
例如,执行以下命令,限制名称为testTopic-01的主题的生产者字节速率为每秒10MB(10485760字节),用于控制该主题的生产者流量:
bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'producer_byte_rate=10485760' --entity-type topics_limit --entity-name testTopic-01
- 基于Topic级别的消费限流
bin/kafka-configs.sh --zookeeper ZooKeeper的任意一个节点的业务IP:clientPort/kafka --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type topics_limit --entity-name Topic的名称
例如,执行以下命令,限制名称为testTopic-01的主题的消费者字节速率为每秒1MB(1048576字节),用于控制该主题的消费者流量:
bin/kafka-configs.sh --zookeeper 192.168.20.36:24002/kafka --alter --add-config 'consumer_byte_rate=1048576' --entity-type topics_limit --entity-name testTopic-01
- 基于用户级别的生产限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties
例如,执行以下命令,限制名称为kafkauser的用户的生产者字节速率为每秒10MB(10485760字节),用于控制该用户下所有生产者的流量:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name kafkauser --command-config config/client.properties
- 基于用户级别的消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --command-config config/client.properties
例如,执行以下命令,限制名称为kafkauser的用户的消费者字节速率为每秒1MB(1048576字节),用于控制该主题的消费者流量:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --command-config config/client.properties
- 基于客户端级别的生产限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties
其中,客户端ID可以在登录Kafka客户端后执行以下命令,查看返回结果中“CLIENT-ID”参数值进行获取。例如获取到的客户端ID为“clientA”。
bin/kafka-consumer-groups.sh --describe --bootstrap-server Kafka集群IP:端口号 --all-groups --command-config config/consumer.properties例如,限制ID名称为clientA的Kafka客户端的生产者字节速率为每秒10MB(10485760字节),用于控制该客户端ID下所有生产者的流量:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA --command-config config/client.properties
- 基于客户端级别的消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'consumer_byte_rate=消费限流的速度' --entity-type clients --entity-name 客户端ID --command-config config/client.properties
例如,执行以下命令,限制ID名称为clientA的Kafka客户端的消费者字节速率为每秒1MB(1048576字节),用于控制该客户端ID的消费者流量:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-name clientA --command-config config/client.properties
- 基于用户+客户端级别的生产和消费限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --add-config 'producer_byte_rate=生产限流的速度,consumer_byte_rate=消费限流的速度' --entity-type users --entity-name 组件业务用户 --entity-type clients --entity-name 客户端ID --command-config config/client.properties
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1048576' --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
- 查看限流信息
bin/kafka-configs.sh --describe --bootstrap-server Kafka集群IP:端口号 --entity-type myType --entity-name myName --command-config config/client.properties
其中,
- “myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
- 如果“myType”选择为“topics_limit”,则命令行需修改为:
bin/kafka-configs.sh --describe --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --entity-type topics_limit --entity-name Topic的名称
- “producer_byte_rate”和“consumer_byte_rate”分别为生产限流的速度和消费限流的速度,其单位均为字节/秒。
例如:
bin/kafka-configs.sh --describe --bootstrap-server 192.168.20.36:21007 --entity-type users --entity-name kafkauser --entity-type clients --entity-name clientD --command-config config/client.properties
- 取消限流
bin/kafka-configs.sh --bootstrap-server Kafka集群IP:端口号 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type myType --entity-name myName --command-config config/client.properties
其中,
- “myType”的取值范围为{topics_limit, users, clients},对应的“myName”的取值范围为{topic的名称,组件业务用户,客户端ID}。
- 若“myType”选择为“topics_limit”,则命令行需修改为:
bin/kafka-configs.sh --zookeeper Zookeeper的任意一个节点的业务IP:clientPort/kafka --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type topics_limit --entity-name Topic的名称
例如:
bin/kafka-configs.sh --bootstrap-server 192.168.20.36:21007 --alter --delete-config 'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name kafkauser --entity-type clients --entity-name client --command-config config/client.properties
- 基于Topic级别的生产限流