更新时间:2022-12-14 GMT+08:00

管理Kafka主题中的消息

操作场景

用户可以根据业务需要,使用MRS集群客户端,在Kafka主题中产生消息,或消费消息。启用Kerberos认证的集群,需要用户拥有在Kafka主题中执行相应操作的权限。

前提条件

已安装客户端。

操作步骤

  1. 单击“实例”,查看Kafka角色实例的IP地址。

    记录Kafka角色实例其中任意一个的IP地址即可。

  2. 根据业务情况,准备好客户端,登录安装客户端的节点。

    请根据客户端所在位置,参考章节,登录安装客户端的节点。

  3. 执行以下命令,切换到客户端目录,例如“/opt/client/Kafka/kafka/bin”。

    cd /opt/client/Kafka/kafka/bin

  4. 执行以下命令,配置环境变量。

    source /opt/client/bigdata_env

  5. 启用Kerberos认证的集群,执行以下命令认证用户身份。未启用Kerberos认证的集群无需执行。

    kinit Kafka用户

    例如:

    kinit admin

  6. 根据业务需要,管理Kafka主题中的消息。

    • 在主题中产生消息

      sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:9092 --topic 主题名称 --producer.config /opt/client/Kafka/kafka/config/producer.properties

      用户可以输入指定的内容作为生产者产生的消息,输入完成后按回车发送消息。如果需要结束产生消息,使用“Ctrl + C”退出任务。

    • 消费主题中的消息

      sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka角色实例所在节点的IP地址:9092 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

      配置文件中“group.id”指定的消费者组默认为“example-group1”。用户可根据业务需要,自定义其他消费者组。每次消费时生效。

      执行命令时默认会读取当前消费者组中未被处理的消息。如果在配置文件指定了新的消费者组且命令中增加参数“--from-beginning”,则会读取所有Kafka中未被自动删除的消息。