管理Kafka Topic中的消息
操作场景
用户可以根据业务需要,使用MRS集群客户端,在Kafka主题中产生消息,或消费消息。
前提条件
- 已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
下载并安装集群客户端的具体操作,请参考安装MRS集群客户端。
- 启用Kerberos认证的集群,需要提前在Manager中创建业务用户,用户拥有在Kafka主题中执行相应操作的权限。
创建用户相关操作请参考创建Kafka用户并绑定角色。
- 已存在待操作的Topic,相关操作可参考创建Kafka Topic。
管理消息
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取Kafka节点业务IP及端口。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令进入Kafka客户端“bin”目录。
cd Kafka/kafka/bin
- 根据业务需要,管理Kafka主题中的消息。
- 在主题中产生消息:
sh kafka-console-producer.sh --broker-list Broker角色实例所在节点的IP地址:21007 --topic Topic名称 --producer.config /opt/client/Kafka/kafka/config/producer.properties
Topic需提前创建,用户可以输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。
- 消费主题中的消息
cd /opt/client/Kafka/kafka/binsource /opt/client/bigdata_envsh kafka-console-consumer.sh --topic Topic名称 --bootstrap-server Broker角色实例所在节点的IP地址:21007 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
- 配置文件中“group.id”指定的消费者组默认为“example-group1”。用户可根据业务需要,自定义其他消费者组。每次消费时生效。
- 执行命令时默认会读取当前消费者组中未被处理的消息。如果在配置文件指定了新的消费者组且命令中增加参数“--from-beginning”,则会读取所有Kafka中未被自动删除的消息。
- 在主题中产生消息: