更新时间:2024-04-07 GMT+08:00

开启消息轨迹

操作场景

查询消息轨迹前,需要先在客户端开启消息轨迹。

本章节介绍使用Java和Go开启消息轨迹的方法。

前提条件

  • 生产者Java客户端版本在4.9.0以上才支持事务消息的轨迹,如果版本不满足要求,请先升级。
  • 开启SSL的RocketMQ实例,生产者和消费者的Java客户端版本在4.9.2以上才支持消息轨迹,如果版本不满足要求,请先升级。

操作步骤(Java)

在客户端开启消息轨迹的方法如下:

  • 生产者开启消息轨迹(除事务消息以外的消息类型

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
  • 生产者开启消息轨迹(事务消息

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null);
  • 消费者开启消息轨迹

    构造函数的“enableMsgTrace”参数传入“true”,例如:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);

操作步骤(Go)

在客户端开启消息轨迹的方法如下:

  1. 执行以下命令,检查是否已安装Go。

    go version

    返回如下回显时,说明Go已经安装。

    [root@ecs-test sarama]# go version
    go version go1.16.5 linux/amd64

    如果未安装Go,请下载并安装

  2. 新建一个“go.mod”,并增加以下代码,添加依赖。

    module rocketmq-example-go
    
    go 1.13
    
    require (
    	github.com/apache/rocketmq-client-go/v2 v2.1.0
    )

  3. 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    func main() {
    	namesrvs := []string{"192.168.0.1:8100"}
    	traceCfg := &primitive.TraceConfig{
    		Access:   primitive.Local,
    		Resolver: primitive.NewPassthroughResolver(namesrvs),
    	}
    
    	p, _ := rocketmq.NewProducer(
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithTrace(traceCfg))
    	err := p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s", err.Error())
    		os.Exit(1)
    	}
    	res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1",
    		[]byte("Hello RocketMQ Go Client!")))
    
    	if err != nil {
    		fmt.Printf("send message error: %s\n", err)
    	} else {
    		fmt.Printf("send message success: result=%s\n", res.String())
    	}
    
    	time.Sleep(10 * time.Second)
    
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

  4. 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/consumer"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    )
    
    func main() {
    	namesrvs := []string{"192.168.0.1:8100"}
    	traceCfg := &primitive.TraceConfig{
    		Access:   primitive.Local,
    		Resolver: primitive.NewPassthroughResolver(namesrvs),
    	}
    
    	c, _ := rocketmq.NewPushConsumer(
    		consumer.WithGroupName("testGroup"),
    		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		consumer.WithTrace(traceCfg),
    	)
    	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
    		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    		fmt.Printf("subscribe callback: %v \n", msgs)
    		return consumer.ConsumeSuccess, nil
    	})
    	if err != nil {
    		fmt.Println(err.Error())
    	}
    	// Note: start after subscribe
    	err = c.Start()
    	if err != nil {
    		fmt.Println(err.Error())
    		os.Exit(-1)
    
    	}
    	time.Sleep(time.Hour)
    	err = c.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown Consumer error: %s", err.Error())
    	}
    }