更新时间:2025-08-13 GMT+08:00
分享

开启和查询RocketMQ消息轨迹

消息轨迹是指一条消息从生产者发出到消费者消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。开启消息轨迹后,当消息收发不符合预期(如消息未发送成功或未消费成功)时,可以通过Message ID、Message Key或Topic的时间范围查询相关消息的消息轨迹,找到消息的实际收发状态,帮助您快速诊断消息问题。

消息轨迹数据

RocketMQ系统中,一条消息的完整链路包含生产者、服务端、消费者三个角色,每个角色处理消息的过程中都会在轨迹链路中增加相关的信息,将这些信息汇聚即可获取任意消息当前的状态。

图1 消息轨迹数据

前提条件

生产者Java客户端版本在4.9.0以上才支持事务消息轨迹,而开启SSL的RocketMQ实例,生产者和消费者的Java客户端都需在4.9.2以上版本才支持消息轨迹,若版本不满足要求,请先升级。客户端版本的查看方法请参考查看RocketMQ消费组详情

开启RocketMQ消息轨迹

您可以根据实际需求,选择使用Java或Go编程语言,或基于Spring框架(Java开发)开启消息轨迹功能。

在客户端开启消息轨迹的方法如下:

  • 生产者开启消息轨迹(除事务消息以外的消息类型

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
  • 生产者开启消息轨迹(事务消息

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null);
  • 消费者开启消息轨迹

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);

在客户端开启消息轨迹的方法如下:

  1. 执行以下命令,检查是否已安装Go。

    go version

    返回如下回显时,说明Go已经安装。如果未安装Go,请下载并安装

    [root@ecs-test sarama]# go version
    go version go1.16.5 linux/amd64

  2. 进入Go脚本所在的bin目录下。
  3. 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。

    module rocketmq-example-go
    go 1.13
    require (
    	github.com/apache/rocketmq-client-go/v2 v2.1.0
    )

  4. 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    func main() {
    	namesrvs := []string{"192.168.0.1:8100"}
    	traceCfg := &primitive.TraceConfig{
    		Access:   primitive.Local,
    		Resolver: primitive.NewPassthroughResolver(namesrvs),
    	}
    
    	p, _ := rocketmq.NewProducer(
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithTrace(traceCfg))   // 增加此行代码表示开启了消息轨迹。
    	err := p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s", err.Error())
    		os.Exit(1)
    	}
    	res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1",
    		[]byte("Hello RocketMQ Go Client!")))
    
    	if err != nil {
    		fmt.Printf("send message error: %s\n", err)
    	} else {
    		fmt.Printf("send message success: result=%s\n", res.String())
    	}
    
    	time.Sleep(10 * time.Second)
    
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

    如上代码中,192.168.0.1:8100表示RocketMQ实例的连接地址和端口,实际取值可在RocketMQ控制台的概览 > 连接信息中获取。

  5. 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/consumer"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    )
    
    func main() {
    	namesrvs := []string{"192.168.0.1:8100"}
    	traceCfg := &primitive.TraceConfig{
    		Access:   primitive.Local,
    		Resolver: primitive.NewPassthroughResolver(namesrvs),
    	}
    
    	c, _ := rocketmq.NewPushConsumer(
    		consumer.WithGroupName("testGroup"),
    		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		consumer.WithTrace(traceCfg),   // 增加此行代码表示开启了消息轨迹。
    	)
    	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
    		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    		fmt.Printf("subscribe callback: %v \n", msgs)
    		return consumer.ConsumeSuccess, nil
    	})
    	if err != nil {
    		fmt.Println(err.Error())
    	}
    	// Note: start after subscribe
    	err = c.Start()
    	if err != nil {
    		fmt.Println(err.Error())
    		os.Exit(-1)
    
    	}
    	time.Sleep(time.Hour)
    	err = c.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown Consumer error: %s", err.Error())
    	}
    }

    如上示例代码中加粗取值的说明及获取方式见表1

    表1 加粗取值的说明及获取方式

    取值

    取值说明

    取值获取方式

    192.168.0.1:8100

    表示RocketMQ实例的连接地址和端口。

    在RocketMQ控制台的“概览 > 连接信息”中获取。

    testGroup

    表示RocketMQ实例下创建的消费组名称。

    在RocketMQ控制台的“实例管理 > 消费组管理”中获取。

    TopicTest

    表示RocketMQ实例下创建的Topic名称。

    在RocketMQ控制台的“实例管理 > Topic管理”中获取。

在客户端开启消息轨迹的方法如下:

  • 生产者开启消息轨迹

    在“application.properties”配置文件中增加如下内容:

    rocketmq.producer.enable-msg-trace=true
  • 消费者开启消息轨迹

    将“enableMsgTrace”参数值配置为“true”,例如:

    @Service
    @RocketMQMessageListener(
        topic = "test-topic-1", 
        consumerGroup = "my-consumer_test-topic-1",
        enableMsgTrace = true
    )
    public class MyConsumer implements RocketMQListener<String> {
        ...
    }

    如上示例代码中加粗取值的说明及获取方式请参考表1

更多配置说明,请参考消息轨迹,如需配置“accessChannel”,请使用默认值LOCAL。

查询消息轨迹

  1. 登录RocketMQ实例控制台
  2. 单击RocketMQ实例的名称,进入实例详情页面。
  3. 在左侧导航栏,单击“实例管理 > 消息查询”,进入“消息查询”页面。
  4. 选择以下任意一种方法,查询消息。

    • 按Topic查询:“Topic”选择待查询消息的Topic名称,“队列”选择待查询消息的队列(仅RocketMQ实例4.8.0版本需要选择),“存储时间”选择待查询消息的时间段,单击“查询”。
      图2 按Topic查询消息

    • 按Message ID查询:“Topic”选择待查询消息所在的Topic名称,“Message ID”输入待查询消息的Message ID,单击“查询”。
      图3 按Message ID查询消息

    • 按Message Key查询:“Topic”选择待查询消息所在的Topic名称,“Message Key”输入待查询消息的Message Key,单击“查询”。
      图4 按Message Key查询消息

  5. 在待查询消息所在行,单击“消息轨迹”。
  6. 查看消息的轨迹,确定是否生产/消费成功,如图5所示。

    图5 查看消息轨迹详情

    消息轨迹的参数说明如表2所示。

    表2 消息轨迹的参数说明

    参数

    参数说明

    生产者状态

    生产者状态如下:

    • 发送成功:消息发送成功,服务端已经成功存储消息。
    • 提交成功:允许消费者消费此事务消息。
    • 回滚:事务消息将被丢弃,不允许消费者消费此事务消息。
    • 未知,待确认:事务消息状态暂时无法确定,等待固定时间后,服务端向生产者进行消息回查。

    生产耗时

    生产者发送消息的耗时。

    单位:毫秒

    生产地址

    生产者的IP地址。

    消费者状态

    消费者状态如下:

    • 消费成功
    • 消费超时
    • 消费异常
    • 消费返回NULL
    • 消费失败

    消费时间

    消费消息的时间。

    时间格式为:YYYY/MM/DD hh:mm:ss

    消费耗时

    消费者消费消息的耗时。

    单位:毫秒

    消费地址

    消费者的IP地址。

相关文档

查询消息轨迹也可以通过调用API完成,具体请参见查询消息轨迹

相关文档