更新时间:2025-08-13 GMT+08:00
分享

管理RocketMQ死信消息

对于消费失败且重试后依然失败的消息,RocketMQ不会立即丢弃,而是将消息转发至指定的队列中,即死信队列,这些消息即为死信消息。当消费失败的原因排查并解决后,您可以重新投递这些死信消息,让消费者重新消费;若您暂时无法处理这些死信消息,为避免到期后死信消息被删除,您也可以先将死信消息导出进行保存。

特性说明

死信消息具有如下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,默认保留48小时。超过默认的48小时后,会被自动删除。如果想要修改死信消息的保留时间,请参考修改RocketMQ消息保留时间

死信队列具有如下特性:

  • 一个死信队列对应一个消费组,而不是对应单个消费者实例。
  • 如果一个消费组未产生死信消息,RocketMQ不会为其创建相应的死信队列。
  • 一个死信队列包含了对应消费组产生的所有死信消息,不论该消息属于哪个Topic。

前提条件

  • 已创建RocketMQ实例和消费组。
  • 如果通过按Message ID查询,需要提前获取消息所在的消费组名称和消息的Message ID。

    Message ID为生产消息后返回的MsgId,如6中返回的内容,也可先通过Topic查询消息,记录Message ID。

  • 如果通过按Message Key查询,需要提前获取消息所在的消费组名称和消息的Message Key。

    Message Key为5中配置的消息Key,也可先通过Topic查询消息,记录Message Key。

查询死信消息的方式

RocketMQ提供的查询死信消息的方式对比如表1所示。

表1 死信消息查询方式说明

查询方式

查询条件

说明

按Group查询

Group+时间段

根据消费组和时间范围,批量获取符合条件的所有死信消息,查询量大,不易匹配。

按Message ID查询

Group+Message ID

根据消费组和Message ID可以精确定位任意一条死信消息,获取死信消息的属性。

按Message Key查询

Group+Message Key

根据消费组和Message Key可以匹配到包含指定Key的死信消息。

查询死信消息

  1. 登录RocketMQ实例控制台
  2. 单击RocketMQ实例的名称,进入实例详情页面。
  3. 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
  4. 选择以下任意一种方法,查询死信消息。

    • 按Group查询:“消费组”选择待查询消费组的名称,“存储时间”选择待查询死信消息的时间段,单击“查询”。
      图1 按Group查询死信消息

    • 按Message ID查询:“消费组”选择待查询消费组的名称,“Message ID”输入待查询死信消息的Message ID,单击“查询”。
      图2 按Message ID查询死信消息

    • 按Message Key查询:“消费组”选择待查询消费组的名称,“Message Key”输入待查询死信消息的Message Key,单击“查询”。
      图3 按Message Key查询死信消息

重新投递死信消息

消费者无法正常处理(消费异常或返回失败)的消息会进入死信队列,您可以在控制台的死信队列中重新投递死信消息给消费者消费。

  1. 登录RocketMQ实例控制台
  2. 单击RocketMQ实例的名称,进入实例详情页面。
  3. 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
  4. 选择以下任意一种方法重新投递死信消息。

    死信消息重新投递成功后,此死信消息依然存在死信队列中,不会被删除。请避免多次重复投递,造成重复消费。

    • 在待重新投递的死信消息所在行,单击“重投”。
    • 如需批量重新投递死信消息,勾选待重新投递的死信消息,单击“批量重投”。

导出死信消息

  1. 登录RocketMQ实例控制台
  2. 单击RocketMQ实例的名称,进入实例详情页面。
  3. 在左侧导航栏,单击“实例管理 > 死信队列”,进入“死信队列”页面。
  4. 在待导出的死信消息所在行,单击“导出消息”。

    导出JSON格式的文件。

    如果需要批量导出死信消息,勾选待导出的多条死信消息,单击“批量导出消息”。

    导出的消息字段说明如表2所示。

    表2 消息字段说明

    消息字段

    字段说明

    msg_id

    消息ID。

    instance_id

    实例ID。

    topic

    Topic名称。

    store_timestamp

    存储消息的时间。

    born_timestamp

    产生消息的时间。

    reconsume_times

    重试次数。

    body

    消息体。

    body_crc

    消息体校验和,用来检查消息体数据是否正确。

    store_size

    存储大小。

    property_list

    消息属性列表。

    • name:属性名称。
    • value:属性值。

    born_host

    产生消息的主机IP。

    store_host

    存储消息的主机IP。

    queue_id

    队列ID。

    queue_offset

    在队列中的偏移量。

相关文档

重新投递死信消息和导出死信消息也可以通过调用API完成,具体请参见重发死信消息导出死信消息

相关文档