更新时间:2025-12-26 GMT+08:00
分享

管理Kafka Topic中的消息

操作场景

用户可以根据业务需要,使用MRS集群客户端,在Kafka主题中产生消息,或消费消息。

前提条件

  • 已安装客户端,例如安装目录为“/opt/client,以下操作的客户端目录只是举例,请根据实际安装目录修改

    下载并安装集群客户端的具体操作,请参考安装MRS集群客户端

  • 启用Kerberos认证的集群,需要提前在Manager中创建业务用户,用户拥有在Kafka主题中执行相应操作的权限。

    创建用户相关操作请参考创建Kafka用户并绑定角色

  • 已存在待操作的Topic,相关操作可参考创建Kafka Topic

管理消息

  1. 登录MRS集群Manager。

    登录集群Manager具体操作,请参考访问MRS集群Manager

  2. 获取Kafka节点业务IP及端口

    1. 选择“集群 > 服务 > Kafka > 实例”,查看并记录任意一个Broker角色实例的业务IP地址。
    2. Kafka集群端口号:

      集群已启用Kerberos认证(安全模式):默认为21007

      集群未启用Kerberos认证(普通模式):默认为9092

  3. 以客户端安装用户,登录安装客户端的节点。
  4. 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client,具体以实际替换

    cd /opt/client

  5. 执行以下命令配置环境变量。

    source bigdata_env

  6. 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)跳过此步骤)

    kinit 组件业务用户

  7. 执行以下命令进入Kafka客户端“bin”目录。

    cd Kafka/kafka/bin

  8. 根据业务需要,管理Kafka主题中的消息。

    • 在主题中产生消息:
      sh kafka-console-producer.sh --broker-list Broker角色实例所在节点的IP地址:21007 --topic Topic名称 --producer.config /opt/client/Kafka/kafka/config/producer.properties

      Topic需提前创建,用户可以输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。

    • 消费主题中的消息

      重新打开一个客户端连接,执行以下命令消费主题中的消息。

      cd /opt/client/Kafka/kafka/bin
      source /opt/client/bigdata_env
      sh kafka-console-consumer.sh --topic Topic名称 --bootstrap-server Broker角色实例所在节点的IP地址:21007  --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
      • 配置文件中“group.id”指定的消费者组默认为“example-group1”。用户可根据业务需要,自定义其他消费者组。每次消费时生效。
      • 执行命令时默认会读取当前消费者组中未被处理的消息。如果在配置文件指定了新的消费者组且命令中增加参数“--from-beginning”,则会读取所有Kafka中未被自动删除的消息。

相关文档