更新时间:2024-07-31 GMT+08:00

删除Kafka消费组

Kafka实例支持通过以下两种方式删除消费组,您可以根据实际情况选择任意一种方式。

  • 方法一:在管理控制台删除消费组
  • 方法二:在Kafka客户端使用命令行工具删除消费组(确保Kafka实例版本与命令行工具版本相同)

约束与限制

  • 若“auto.create.groups.enable”设置为“true”,当消费组的状态为“EMPTY”且从未提交过offset,系统将在十分钟后自动删除该消费组。
  • 若“auto.create.groups.enable”设置为“false”,系统不会自动删除消费组。如果需要删除消费组,需要您手动删除。
  • 若消费组从未提交过offset,当Kafka实例重启后,该消费组会被删除。

前提条件

待删除消费组的状态为“EMPTY”。

方法一:在管理控制台删除消费组

  1. 登录管理控制台。
  2. 在管理控制台左上角单击,选择区域。

    请选择Kafka实例所在的区域。

  3. 在管理控制台左上角单击,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。
  4. 单击Kafka实例的名称,进入实例详情页面。
  5. 在左侧导航栏选择“消费组管理”,进入消费组列表页面。
  6. 通过以下任意一种方法,删除消费组。

    • 勾选消费组名称左侧的方框,可选一个或多个,单击信息栏左上侧的“删除消费组”。
    • 在待删除消费组所在行,单击“删除”。

    仅在“消费组状态”为“EMPTY”时,支持删除。

    消费组包含以下状态:

    • DEAD:消费组内没有任何成员,且没有任何元数据。
    • EMPTY:消费组内没有任何成员,存在元数据。
    • PREPARING_REBALANCE:准备开启Rebalance。
    • COMPLETING_REBALANCE:所有成员加入消费组。
    • STABLE:消费组内成员可以正常消费。

  7. 弹出“删除消费组”对话框,单击“确定”,完成消费组的删除。

方法二:使用命令行工具删除消费组

以下操作命令以Linux系统为例进行说明:

  • 未开启密文接入的Kafka实例,在Kafka客户端的“/bin”目录下,通过以下命令删除消费组。
    ./kafka-consumer-groups.sh --bootstrap-server ${connection-address} --delete --group ${consumer-group-name}

    参数说明如下:

    • connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
    • consumer-group-name:消费组名称。

    示例如下:

    [root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092 --delete --group group-01
    Deletion of requested consumer groups ('group-01') was successful.
    [root@ecs-kafka bin]#
  • 已开启密文接入的Kafka实例,通过以下步骤删除消费组。
    1. (可选)修改客户端配置文件。
      在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,请根据实际情况配置。
      • SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
        security.protocol=SASL_PLAINTEXT
        #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=SCRAM-SHA-512
        #SASL认证机制为“PLAIN”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=PLAIN

        参数说明如下:username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。

      • SASL_SSL:如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
        security.protocol=SASL_SSL
        ssl.truststore.location={ssl_truststore_path}
        ssl.truststore.password=dms@kafka
        ssl.endpoint.identification.algorithm=
        #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=SCRAM-SHA-512
        #SASL认证机制为“PLAIN”时,配置信息如下。
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="**********" \
        password="**********";        
        sasl.mechanism=PLAIN

        参数说明如下:

        • ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
        • ssl.truststore.password为服务器证书密码,不可更改需要保持为dms@kafka
        • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空
        • username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
    2. 在Kafka客户端的“/bin”目录下,通过以下命令删除消费组。
      ./kafka-consumer-groups.sh --bootstrap-server ${connection-address} --delete --group ${consumer-group-name} --command-config ../config/ssl-user-config.properties

      参数说明如下:

      • connection-address:在Kafka控制台的“基本信息 > 连接信息”中,获取Kafka实例的连接地址。
      • consumer-group-name:消费组名称。

      示例如下:

      [root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9093,192.168.xx.xx:9093,192.168.xx.xx:9093 --delete --group group-02 --command-config ../config/ssl-user-config.properties
      Deletion of requested consumer groups ('group-02') was successful.
      [root@ecs-kafka bin]#