更新时间:2025-09-03 GMT+08:00

查看和重置Kafka消费进度

消费进度表示消费者的消费位置,本章节介绍如何查看和重置消费进度,重置消费进度即修改消费者的消费位置。

约束与限制

重置消费进度可能会导致重复消费,请谨慎操作。

前提条件

Kafka实例不支持在线重置消费进度,请先将待重置消费进度的消费组停止消费,然后重置消费进度。停止待重置消费进度的消费者客户端后,需要等待一段时间(即ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG配置的时间,默认为1000毫秒),服务端才会认为此消费者已下线。

查看消费进度(控制台)

  1. 登录管理控制台。
  2. 在管理控制台左上角单击,选择Kafka实例所在的区域。
  3. 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入Kafka实例列表页面。
  4. 单击Kafka实例的名称,进入实例详情页面。
  5. 在左侧导航栏选择“实例管理 > 消费组管理”,进入消费组列表页面。
  6. 单击待查看消费进度的消费组名称,进入消费组详情页。
  7. 在“消费进度”页签,查看此消费组订阅的Topic列表、Topic的分区数、Topic中消息的总堆积数、Topic每个分区的消费进度。

    表1 消费进度参数说明

    参数名称

    说明

    Topic名称

    消费组订阅的Topic名称。

    分区数

    Topic的分区数。

    总堆积数

    Topic中未被消费组消费的消息数。

    总堆积数为采集时刻的瞬时值,而监控指标中的堆积数采集周期为1分钟,由于采集周期不同,可能会导致数值存在差异。具体现象请参见消费进度显示有堆积消息,为什么监控中显示消息堆积数为0?

    分区编号

    Topic的分区号。

    堆积数

    该分区中未被消费组消费的消息数。

    偏移量

    该分区的偏移量。

    最大消息位置

    该分区的最大消息位置。

    消费者ID

    消费该分区消息的消费者ID。

    消费者地址

    消费该分区消息的消费者地址。

    客户端ID

    客户端标识符,该客户端用于连接Kafka实例并消费该分区的消息。

  8. (可选)如果需要查询某个Topic的消费进度,在搜索框中,输入Topic名称,按“Enter”。
  9. (可选)如果需要导出消费进度到本地,参考以下任意一种方法操作。

    • 勾选待导出的Topic,单击“导出 > 导出已选中数据到XLSX”,导出指定Topic的消费进度。
    • 单击“导出 > 导出全部数据到XLSX”,导出全部Topic的消费进度。

查看消费进度(Kafka客户端)

  • 未开启密文接入的Kafka实例,在Kafka客户端的“/bin”目录下,通过以下命令查询消费进度。
    ./kafka-consumer-groups.sh --bootstrap-server ${connection-address} --offsets --describe --all-groups

    connection-address表示Kafka实例的连接地址,在Kafka控制台的“概览 > 连接信息”中获取。

    示例如下:

    [root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092 --offsets --describe --all-groups
    
    Consumer group '__consumer-group-dial-test' has no active members.
    
    GROUP                      TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    __consumer-group-dial-test __dms_dial_test 0          350             350             0               -               -               -
    __consumer-group-dial-test __dms_dial_test 1          350             350             0               -               -               -
    __consumer-group-dial-test __dms_dial_test 2          350             350             0               -               -               -
    
    Consumer group 'test' has no active members.
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    test            topic-01        0          5               5               0               -               -               -
    test            topic-01        1          3               3               0               -               -               -
    test            topic-01        2          10              10              0               -               -               -
    [root@ecs-kafka bin]#
  • 已开启密文接入的Kafka实例,通过以下步骤查询消费进度。
    1. (可选)如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤,执行2。否则请执行以下操作。

      在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加用户名和密码,以及SSL证书配置。

    2. 在Kafka客户端的“/bin”目录下,通过以下命令查询消费进度。
      ./kafka-consumer-groups.sh --bootstrap-server {connection-address} --offsets --describe --all-groups --command-config ../config/{ssl-user-config.properties} 
      表2 查看消费进度参数说明

      参数名称

      说明

      connection-address

      Kafka实例的连接地址,在Kafka控制台的“概览 > 连接信息”中获取。

      ssl-user-config.properties

      配置文件的名称,此配置文件用于存放用户名和密码,以及SSL证书配置信息。

      示例如下:

      [root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9093,192.168.xx.xx:9093,192.168.xx.xx:9093 --offsets --describe --all-groups --command-config ../config/ssl-user-config.properties
      
      Consumer group '__consumer-group-dial-test' has no active members.
      
      GROUP                      TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      __consumer-group-dial-test __dms_dial_test 0          347             347             0               -               -               -
      __consumer-group-dial-test __dms_dial_test 1          347             347             0               -               -               -
      __consumer-group-dial-test __dms_dial_test 2          347             347             0               -               -               -
      
      Consumer group 'test' has no active members.
      
      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      test            topic-01        0          5               5               0               -               -               -
      test            topic-01        1          3               3               0               -               -               -
      test            topic-01        2          10              10              0               -               -               -
      [root@ecs-kafka bin]#

重置消费进度

  1. 登录管理控制台。
  2. 在管理控制台左上角单击,选择Kafka实例所在的区域。
  3. 在管理控制台左上角单击,选择“应用服务 > 分布式消息服务 Kafka”,进入Kafka实例列表页面。
  4. 单击Kafka实例的名称,进入实例详情页面。
  5. 在左侧导航栏选择“实例管理 > 消费组管理”,进入消费组列表页面。
  6. 单击待重置消费进度的消费组名称,进入消费组详情页。
  7. 在“消费进度”页签,通过以下方法,重置消费进度。

    • 重置单个Topic所有分区的消费进度:在待重置消费进度的Topic后,单击“重置消费进度”。
    • 重置单个Topic中单个分区的消费进度:在待重置消费进度的Topic分区后,单击“重置消费进度”。
    • 重置所有Topic所有分区的消费进度:单击“重置所有消费进度”。

  8. 在弹出的“重置消费进度”对话框中,参考表3,设置重置消费进度参数。

    表3 重置消费进度参数说明

    参数

    说明

    重置类型

    选择重置类型:

    • 时间:重置消费进度到指定的时间。
    • 偏移量:重置消费进度到指定的偏移量。

    “重置所有消费进度”只支持重置消费进度到指定时间。

    时间

    当“重置类型”为“时间”时,需要设置此参数。

    选择重置消费进度的时间点,重置完成后,将从此时间点开始消费。

    • 最早:最早偏移量
    • 自定义:自定义时间点
    • 最晚:最晚偏移量

    偏移量

    当“重置类型”为“偏移量”时,需要设置此参数。

    选择重置消费进度的偏移量,此偏移量不能小于0,重置完成后,将从此偏移量开始消费。

  9. 单击“确定”,弹出确认对话框。
  10. 单击“是”,完成消费进度的重置。

    在“消费进度”页签,在已重置消费进度的Topic前单击,在“偏移量”列查看重置后的值。