更新时间:2026-06-11 GMT+08:00
使用Kafka生产消费数据
操作场景
用户可以通过MRS集群Kafka客户端在Kafka Topic中产生和消费消息。
前提条件
- 已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
下载并安装集群客户端的具体操作,请参考安装MRS集群客户端。
- 启用Kerberos认证的集群,需要提前在Manager中创建业务用户,用户拥有在Kafka主题中执行相应操作的权限。
创建用户相关操作请参考创建Kafka用户并绑定角色。
- 已存在待操作的Topic,相关操作可参考创建Kafka Topic。
使用Kafka客户端生产消费数据
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取Kafka节点业务IP及端口。
- 选择“集群 > 服务 > Kafka > 实例”,查看并记录任意一个Broker角色实例的业务IP地址。
- 选择“配置 > 全部配置”,并根据集群模式获取端口号:
- 集群已启用Kerberos认证(安全模式):搜索“sasl.port”参数,查看并记录端口号,默认为21007。
- 集群未启用Kerberos认证(普通模式):搜索“port”参数,查看并记录端口号,默认为9092。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令进入Kafka客户端“bin”目录。
cd Kafka/kafka/bin
- 执行以下命令使用客户端工具查看帮助并使用。
- Kafka消息发布工具:
./kafka-console-producer.sh
- Kafka消息读取工具:
./kafka-console-consumer.sh
- Kafka消息发布工具:
- 根据业务需要,管理Kafka主题中的消息。
- 在主题中产生消息
sh kafka-console-producer.sh --broker-list Broker角色实例所在节点的IP地址:21007 --topic Topic名称 --producer.config /opt/client/Kafka/kafka/config/producer.properties
Topic需提前创建,用户可以输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。
- 消费主题中的消息
cd /opt/client/Kafka/kafka/binsource /opt/client/bigdata_envsh kafka-console-consumer.sh --topic Topic名称 --bootstrap-server Broker角色实例所在节点的IP地址:21007 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
- 配置文件中“group.id”指定的消费者组默认为“example-group1”。用户可根据业务需要,自定义其他消费者组。每次消费时生效。
- 执行命令时默认会读取当前消费者组中未被处理的消息。如果在配置文件指定了新的消费者组且命令中增加参数“--from-beginning”,则会读取所有Kafka中未被自动删除的消息。
- 在主题中产生消息
相关文档
- 关于Kafka UI页面详细介绍及操作,请参考登录KafkaUI界面。
- 如果需要查看生产消费详情,请参考查看Kafka数据生产消费详情。
父主题: 使用Kafka