更新时间:2024-04-08 GMT+08:00
开启消息轨迹
操作场景
查询消息轨迹前,需要先在客户端开启消息轨迹。
本章节介绍使用Java和Go开启消息轨迹的方法。
前提条件
- 生产者Java客户端版本在4.9.0以上才支持事务消息的轨迹,如果版本不满足要求,请先升级。
- 开启SSL的RocketMQ实例,生产者和消费者的Java客户端版本在4.9.2以上才支持消息轨迹,如果版本不满足要求,请先升级。
操作步骤(Java)
在客户端开启消息轨迹的方法如下:
- 生产者开启消息轨迹(除事务消息以外的消息类型)
构造函数的“enableMsgTrace”参数传入“true”,例如:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
- 生产者开启消息轨迹(事务消息)
构造函数的“enableMsgTrace”参数传入“true”,例如:
TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null);
- 消费者开启消息轨迹
构造函数的“enableMsgTrace”参数传入“true”,例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);
操作步骤(Go)
在客户端开启消息轨迹的方法如下:
- 执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
[root@ecs-test sarama]# go version go version go1.16.5 linux/amd64
如果未安装Go,请下载并安装。
- 新建一个“go.mod”,并增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.0 )
- 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithTrace(traceCfg)) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } time.Sleep(10 * time.Second) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
- 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithTrace(traceCfg), ) err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }
父主题: 消息管理