使用Kafka生产消费数据
操作场景
用户可以通过MRS集群Kafka客户端完成Kafka Topic的创建、查询、删除等操作,也可以通过登录KafkaUI查看当前集群的消费信息。
前提条件
- 使用Kafka客户端时:已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
- 使用KafkaUI操作时:已创建具有KafkaUI页面访问权限的用户,如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
第一次访问Manager和KafkaUI,需要在浏览器中添加站点信任以继续访问KafkaUI。
使用Kafka客户端生产消费数据
- 安装客户端。
下载并安装集群客户端的具体操作,请参考安装MRS集群客户端。
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取ZooKeeper节点业务IP及端口。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令进入Kafka客户端“bin”目录。
cd Kafka/kafka/bin
- 执行以下命令,创建一个Topic:
sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如执行以下命令:
sh kafka-topics.sh --create --topic TopicTest --partitions 3 --replication-factor 3 --zookeeper 10.10.10.100:2181/kafka
- 执行以下命令,查询集群中的Topic信息:
sh kafka-topics.sh --list --bootstrap-server Kafka集群IP:21007 --command-config ../config/client.properties例如执行以下命令:
sh kafka-topics.sh --list --bootstrap-server 10.10.10.100:21007 --command-config ../config/client.properties
- 执行以下命令,删除9中创建的Topic:
sh kafka-topics.sh --delete --topic 主题名称 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如执行以下命令:
sh kafka-topics.sh --delete --topic TopicTest --zookeeper 10.10.10.100:2181/kafka
使用KafkaUI查看消费信息
- 进入KafkaUI界面。
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
- 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
- 在“Cluster Summary”栏,可查看当前集群已有的Topic、Broker和Consumer Group数量。

- 单击“Brokers”、“Topics”、“Consumer Group”下方的数字,可自动跳转至对应页面,查看并操作对应信息。
- 在“Cluster Action”栏,可创建Topic与分区迁移,具体操作请分别参考使用KafkaUI创建Kafka Topic和使用KafkaUI迁移分区(MRS 3.5.0之前版本)章节。
- 在“Topic Rank”栏,可查看当前集群Topic日志条数、数据体积大小、数据流入量、数据流出量前十名的Topic。

- 单击“TopicName”可进入到该Topic的详情页面中,在该页面的具体操作请参考查看Kafka数据生产消费详情。