分布式消息服务Kafka版
分布式消息服务Kafka版
- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 如何修改安全协议?
- 修改企业项目,是否会导致Kafka重启?
- 100MB/s的带宽怎样开启公网访问?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka包周期实例支持删除吗?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
- Kafka体验版使用说明
-
实例问题
- 故障排除
- 视频帮助
- 文档下载
- 通用参考
更新时间:2024-12-30 GMT+08:00
链接复制成功!
查看和重置Kafka消费进度
消费进度表示消费者的消费位置,本章节介绍如何查看和重置消费进度,重置消费进度即修改消费者的消费位置。
操作视频
本视频演示重置Kafka消费进度的操作。
约束与限制
重置消费进度可能会导致重复消费,请谨慎操作。
前提条件
Kafka实例不支持在线重置消费进度,请先将待重置消费进度的消费组停止消费,然后重置消费进度。停止待重置消费进度的消费者客户端后,需要等待一段时间(即ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG配置的时间,默认为1000毫秒),服务端才会认为此消费者已下线。
查看消费进度(控制台)
- 登录管理控制台。
- 在管理控制台左上角单击
,选择Kafka实例所在的区域。
- 在管理控制台左上角单击
,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。
- 在左侧导航栏单击“Kafka实例”,进入Kafka实例列表页面。
- 单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏选择“消费组管理”,进入消费组列表页面。
- 单击待查看消费进度的消费组名称,进入消费组详情页。
- 在“消费进度”页签,查看此消费组订阅的Topic列表、Topic中消息的总堆积数、Topic每个分区的消费进度(包括堆积数、偏移量、最大消息位置、消费者ID、消费者地址和客户端ID)。
图1 消费进度
- (可选)如果需要查询某个Topic的消费进度,在搜索框中,输入Topic名称,按“Enter”。
查看消费进度(Kafka客户端)
- 未开启密文接入的Kafka实例,在Kafka客户端的“/bin”目录下,通过以下命令查询消费进度。
./kafka-consumer-groups.sh --bootstrap-server ${connection-address} --offsets --describe --all-groups
参数说明如下:connection-address表示Kafka实例的连接地址,在Kafka控制台的“基本信息 > 连接信息”中获取。
示例如下:
[root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092 --offsets --describe --all-groups Consumer group '__consumer-group-dial-test' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID __consumer-group-dial-test __dms_dial_test 0 350 350 0 - - - __consumer-group-dial-test __dms_dial_test 1 350 350 0 - - - __consumer-group-dial-test __dms_dial_test 2 350 350 0 - - - Consumer group 'test' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test topic-01 0 5 5 0 - - - test topic-01 1 3 3 0 - - - test topic-01 2 10 10 0 - - - [root@ecs-kafka bin]#
- 已开启密文接入的Kafka实例,通过以下步骤查询消费进度。
- (可选)修改客户端配置文件。
在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,请根据实际情况配置。
- SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
security.protocol=SASL_PLAINTEXT #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
参数说明如下:username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
- SASL_SSL:如果已经设置了用户名和密码,以及SSL证书配置,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
参数说明如下:
- ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
- ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。
- SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,在文件中增加如下内容。
- 在Kafka客户端的“/bin”目录下,通过以下命令查询消费进度。
./kafka-consumer-groups.sh --bootstrap-server ${connection-address} --offsets --describe --all-groups --command-config ../config/ssl-user-config.properties
参数说明如下:connection-address表示Kafka实例的连接地址,在Kafka控制台的“基本信息 > 连接信息”中获取。
示例如下:
[root@ecs-kafka bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.xx.xx:9093,192.168.xx.xx:9093,192.168.xx.xx:9093 --offsets --describe --all-groups --command-config ../config/ssl-user-config.properties Consumer group '__consumer-group-dial-test' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID __consumer-group-dial-test __dms_dial_test 0 347 347 0 - - - __consumer-group-dial-test __dms_dial_test 1 347 347 0 - - - __consumer-group-dial-test __dms_dial_test 2 347 347 0 - - - Consumer group 'test' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test topic-01 0 5 5 0 - - - test topic-01 1 3 3 0 - - - test topic-01 2 10 10 0 - - - [root@ecs-kafka bin]#
- (可选)修改客户端配置文件。
重置消费进度
- 登录管理控制台。
- 在管理控制台左上角单击
,选择Kafka实例所在的区域。
- 在管理控制台左上角单击
,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。
- 在左侧导航栏单击“Kafka实例”,进入Kafka实例列表页面。
- 单击Kafka实例的名称,进入实例详情页面。
- 在左侧导航栏选择“消费组管理”,进入消费组列表页面。
- 单击待重置消费进度的消费组名称,进入消费组详情页。
- 在“消费进度”页签,通过以下方法,重置消费进度。
- 重置单个Topic所有分区的消费进度:在待重置消费进度的Topic后,单击“重置消费进度”。
- 重置单个Topic中单个分区的消费进度:在待重置消费进度的Topic分区后,单击“重置消费进度”。
- 重置所有Topic所有分区的消费进度:单击“重置所有消费进度”。
- 在弹出的“重置消费进度”对话框中,参考表1,设置重置消费进度参数。
- 单击“确定”,弹出确认对话框。
- 单击“是”,完成消费进度的重置。
父主题: 管理消费组