管理RocketMQ死信消息
对于消费失败且重试后依然失败的消息,RocketMQ不会立即丢弃,而是将消息转发至指定的队列中,即死信队列,这些消息即为死信消息。当消费失败的原因排查并解决后,您可以重新投递这些死信消息,让消费者重新消费;若您暂时无法处理这些死信消息,为避免到期后死信消息被删除,您也可以先将死信消息导出进行保存。
特性说明
死信消息具有如下特性:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,默认保留48小时。超过默认的48小时后,会被自动删除。如果想要修改死信消息的保留时间,请参考修改RocketMQ消息保留时间。
死信队列具有如下特性:
- 一个死信队列对应一个消费组,而不是对应单个消费者实例。
- 如果一个消费组未产生死信消息,RocketMQ不会为其创建相应的死信队列。
- 一个死信队列包含了对应消费组产生的所有死信消息,不论该消息属于哪个Topic。
前提条件
查询死信消息的方式
RocketMQ提供的查询死信消息的方式对比如表1所示。
查询死信消息
- 登录RocketMQ实例控制台。
- 单击RocketMQ实例的名称,进入实例详情页面。
- 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
- 选择以下任意一种方法,查询死信消息。
- 按Group查询:“消费组”选择待查询消费组的名称,“存储时间”选择待查询死信消息的时间段,单击“查询”。
图1 按Group查询死信消息
- 按Message ID查询:“消费组”选择待查询消费组的名称,“Message ID”输入待查询死信消息的Message ID,单击“查询”。
图2 按Message ID查询死信消息
- 按Message Key查询:“消费组”选择待查询消费组的名称,“Message Key”输入待查询死信消息的Message Key,单击“查询”。
图3 按Message Key查询死信消息
- 按Group查询:“消费组”选择待查询消费组的名称,“存储时间”选择待查询死信消息的时间段,单击“查询”。
重新投递死信消息
消费者无法正常处理(消费异常或返回失败)的消息会进入死信队列,您可以在控制台的死信队列中重新投递死信消息给消费者消费。
- 登录RocketMQ实例控制台。
- 单击RocketMQ实例的名称,进入实例详情页面。
- 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
- 选择以下任意一种方法重新投递死信消息。
死信消息重新投递成功后,此死信消息依然存在死信队列中,不会被删除。请避免多次重复投递,造成重复消费。
- 在待重新投递的死信消息所在行,单击“重投”。
- 如需批量重新投递死信消息,勾选待重新投递的死信消息,单击“批量重投”。
导出死信消息
- 登录RocketMQ实例控制台。
- 单击RocketMQ实例的名称,进入实例详情页面。
- 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
- 在待导出的死信消息所在行,单击“导出消息”。
导出JSON格式的文件。
如果需要批量导出死信消息,勾选待导出的多条死信消息,单击“批量导出消息”。
导出的消息字段说明如表2所示。
表2 消息字段说明 消息字段
字段说明
msg_id
消息ID。
instance_id
实例ID。
topic
Topic名称。
store_timestamp
存储消息的时间。
born_timestamp
产生消息的时间。
reconsume_times
重试次数。
body
消息体。
body_crc
消息体校验和,用来检查消息体数据是否正确。
store_size
存储大小。
property_list
消息属性列表。
- name:属性名称。
- value:属性值。
born_host
产生消息的主机IP。
store_host
存储消息的主机IP。
queue_id
队列ID。
queue_offset
在队列中的偏移量。