快速使用Kafka生产消费数据
操作场景
用户可以在集群客户端完成Topic的创建、查询、删除等基本操作。可参考Kafka用户权限说明设置用户权限,然后参考使用Kafka客户端生产消费数据(MRS 3.x之前版本)进行操作。
MRS 3.1.2及之后版本集群也可以通过登录KafkaUI查看当前集群的消费信息。详细操作请参考使用KafkaUI查看消费信息(MRS 3.1.2及之后版本)。
前提条件
- 使用Kafka客户端时:已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
 - 使用KafkaUI时:已创建具有KafkaUI页面访问权限的用户,如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
    
第一次访问Manager和KafkaUI,需要在浏览器中添加站点信任以继续访问KafkaUI。
 
使用Kafka客户端生产消费数据(MRS 3.x之前版本)
- 安装客户端,具体请参考安装客户端章节。
 - 进入ZooKeeper实例页面:
    
    单击集群名称,登录集群详情页面,选择“组件管理 > ZooKeeper > 实例”。
 
      若集群详情页面没有“组件管理”页签,请先完成IAM用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。
 - 查看ZooKeeper角色实例的IP地址。
    
    
记录ZooKeeper角色实例其中任意一个的IP地址即可。
 - 登录安装客户端的节点。
 - 执行以下命令,切换到客户端目录,例如“/opt/client/Kafka/kafka/bin”。
    
    
cd /opt/client/Kafka/kafka/bin
 - 执行以下命令,配置环境变量。
    
    
source /opt/client/bigdata_env
 - 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。
    
    
kinitKafka用户
 - 创建一个Topic:
    
    
sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --create --topic TopicTest --partitions 3 --replication-factor 3 --zookeeper 10.10.10.100:2181/kafka
 - 执行以下命令,查询集群中的Topic信息:
    
    
sh kafka-topics.sh --list --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --list --zookeeper 10.10.10.100:2181/kafka
 - 删除8中创建的Topic:
    
    
sh kafka-topics.sh --delete --topic 主题名称 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --delete --topic TopicTest --zookeeper 10.10.10.100:2181/kafka
输入 "y",回车。
 
使用Kafka客户端生产消费数据(MRS 3.x及之后版本)
- 安装客户端,具体请参考安装客户端章节。
 - 进入ZooKeeper实例页面:
    
    
登录FusionInsight Manager,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本)。然后选择“集群 > 服务 > ZooKeeper > 实例”。
 - 查看ZooKeeper角色实例的IP地址。
    
    
记录ZooKeeper角色实例其中任意一个的IP地址即可。
 - 登录安装客户端的节点。
 - 执行以下命令,切换到客户端目录,例如“/opt/client/Kafka/kafka/bin”。
    
    
cd /opt/client/Kafka/kafka/bin
 - 执行以下命令,配置环境变量。
    
    
source /opt/client/bigdata_env
 - 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。
    
    
kinitKafka用户
 - 登录FusionInsight Manager,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索参数“clientPort”,记录“clientPort”的参数值。
 - 创建一个Topic:
    
    
sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --create --topic TopicTest --partitions 3 --replication-factor 3 --zookeeper 10.10.10.100:2181/kafka
 - 执行以下命令,查询集群中的Topic信息:
    
    
sh kafka-topics.sh --list --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --list --zookeeper 10.10.10.100:2181/kafka
 - 删除9中创建的Topic:
    
    
sh kafka-topics.sh --delete --topic 主题名称 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
例如:sh kafka-topics.sh --delete --topic TopicTest --zookeeper 10.10.10.100:2181/kafka
 
使用KafkaUI查看消费信息(MRS 3.1.2及之后版本)
- 进入KafkaUI界面。
    
    
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
      
如需在页面上进行相关操作,例如创建Topic,需同时授予用户相关权限,请参考Kafka用户权限说明。
 - 在“KafkaManager WebUI”右侧,单击URL链接,访问KafkaUI的页面。
 
 - 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Kafka”。
      
 - 在“Cluster Summary”栏,可查看当前集群已有的Topic、Broker和Consumer Group数量。
    
    

 - 单击“Brokers”、“Topics”、“Consumer Group”下方的数字,可自动跳转至对应页面,查看并操作对应信息。
 - 在“Cluster Action”栏,可创建Topic与分区迁移,具体操作请参考增加Kafka Topic分区。
 - 在“Topic Rank”栏,可查看当前集群Topic日志条数、数据体积大小、数据流入量、数据流出量前十名的Topic。
    
    

 - 单击“TopicName”可进入到该Topic的详情页面中,在该页面的具体操作请参考查看Kafka数据生产消费详情。