更新时间:2022-12-14 GMT+08:00

使用Kafka客户端

操作场景

用户可以在集群客户端完成Topic的创建、查询、删除等基本操作。

前提条件

已安装客户端,例如安装目录为“/opt/hadoopclient”,以下操作的客户端目录只是举例,请根据实际安装目录修改。

使用Kafka客户端(MRS 3.x之前版本)

  1. 进入ZooKeeper实例页面:

    • MRS 2.0.1之前版本,登录MRS Manager,选择“服务管理 > ZooKeeper > 实例”。
    • MRS 2.0.1及至3.x之前版本,在MRS控制台单击集群名称,选择“组件管理 > ZooKeeper > 实例”。

      若集群详情页面没有“组件管理”页签,请先完成IAM用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。

  2. 查看ZooKeeper角色实例的IP地址。

    记录ZooKeeper角色实例其中任意一个的IP地址即可。

  3. 登录安装客户端的节点。
  4. 执行以下命令,切换到客户端目录,例如“/opt/hadoopclient/Kafka/kafka/bin”。

    cd /opt/hadoopclient/Kafka/kafka/bin

  5. 执行以下命令,配置环境变量。

    source /opt/hadoopclient/bigdata_env

  6. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。

    kinit Kafka用户

  7. 创建一个Topic:

    sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --create --topic TopicTest --partitions 3 --replication-factor 3 --zookeeper 10.10.10.100:2181/kafka

  8. 执行以下命令,查询集群中的Topic信息:

    sh kafka-topics.sh --list --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --list --zookeeper 10.10.10.100:2181/kafka

  9. 删除7中创建的Topic:

    sh kafka-topics.sh --delete --topic 主题名称 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --delete --topic TopicTest --zookeeper 10.10.10.100:2181/kafka

    输入 "y",回车。

使用Kafka客户端(MRS 3.x及之后版本)

  1. 进入ZooKeeper实例页面:

    登录FusionInsight Manager,具体请参见访问FusionInsight Manager(MRS 3.x及之后版本)。然后选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”。

  2. 查看ZooKeeper角色实例的IP地址。

    记录ZooKeeper角色实例其中任意一个的IP地址即可。

  3. 登录安装客户端的节点。
  4. 执行以下命令,切换到客户端目录,例如“/opt/hadoopclient/Kafka/kafka/bin”。

    cd /opt/hadoopclient/Kafka/kafka/bin

  5. 执行以下命令,配置环境变量。

    source /opt/hadoopclient/bigdata_env

  6. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。

    kinit Kafka用户

  7. 登录FusionInsight Manager,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索参数“clientPort”,记录“clientPort”的参数值。
  8. 创建一个Topic:

    sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --create --topic TopicTest --partitions 3 --replication-factor 3 --zookeeper 10.10.10.100:2181/kafka

  9. 执行以下命令,查询集群中的Topic信息:

    sh kafka-topics.sh --list --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --list --zookeeper 10.10.10.100:2181/kafka

  10. 删除8中创建的Topic:

    sh kafka-topics.sh --delete --topic 主题名称 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka

    例如:sh kafka-topics.sh --delete --topic TopicTest --zookeeper 10.10.10.100:2181/kafka