管理Kafka Topic中的消息
操作场景
用户可以根据业务需要,使用MRS集群客户端,在Kafka主题中产生消息,或消费消息。
前提条件
- 已安装集群客户端。
 - 启用Kerberos认证的集群,需要提前在Manager中创建业务用户,用户拥有在Kafka主题中执行相应操作的权限。
 
操作步骤
- 进入Kafka服务页面:
    
    
- MRS3.x之前版本,单击集群名称,登录集群详情页面,选择“组件管理 > Kafka”。
      
 
       若集群详情页面没有“组件管理”页签,请先完成IAM用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。
 - MRS 3.x及后续版本,登录FusionInsight Manager,然后选择“集群 > 待操作的集群名称 > 服务 > Kafka”。
 
 - MRS3.x之前版本,单击集群名称,登录集群详情页面,选择“组件管理 > Kafka”。
      
 - 单击“实例”,查看Kafka Broker角色实例的IP地址。
    
    
记录Kafka角色实例其中任意一个的IP地址即可。
 - 根据业务情况,准备好客户端,登录安装客户端的节点。
    
    
请根据客户端所在位置,参考使用MRS客户端章节,登录安装客户端的节点。
 - 执行以下命令,切换到客户端目录,例如“/opt/client/Kafka/kafka/bin”。
    
    
cd /opt/client/Kafka/kafka/bin
 - 执行以下命令,配置环境变量。
    
    
source /opt/client/bigdata_env
 - 启用Kerberos认证的集群,执行以下命令认证用户身份。未启用Kerberos认证的集群无需执行本步骤。
    
    
kinit Kafka用户
 - 根据业务需要,管理Kafka主题中的消息。
    
    
- 在主题中产生消息
      
sh kafka-console-producer.sh --broker-list Broker角色实例所在节点的IP地址:9092 --topic Topic名称 --producer.config /opt/client/Kafka/kafka/config/producer.properties
Topic需提前创建,用户可以输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。
 - 消费主题中的消息
      
      
cd /opt/client/Kafka/kafka/bin
source /opt/client/bigdata_env
sh kafka-console-consumer.sh --topic Topic名称 --bootstrap-server Broker角色实例所在节点的IP地址:9092 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
配置文件中“group.id”指定的消费者组默认为“example-group1”。用户可根据业务需要,自定义其他消费者组。每次消费时生效。
执行命令时默认会读取当前消费者组中未被处理的消息。如果在配置文件指定了新的消费者组且命令中增加参数“--from-beginning”,则会读取所有Kafka中未被自动删除的消息。
 
 
     - Kafka角色实例所在节点IP地址,填写Broker角色实例其中任意一个的IP地址即可。
 - 如果集群启用Kerberos认证,则端口需要修改为“21007”。
 - 默认情况下,ZooKeeper的“clientPort”为“2181”。
 
 - 在主题中产生消息