收发顺序消息
顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者为每一条消息指定消息组,相同消息组的消息会被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发顺序消息前,需要确保Topic的消息类型为“顺序”。
- 使用gRPC协议连接RocketMQ实例时,消费者是否顺序消费消息,取决于消费组中是否开启顺序消费,并非在消费代码中设置,顺序消费消息的代码与普通消费的代码相同。
准备环境
- 执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.16.5 linux/amd64
如果未安装Go,请下载并安装。
- 在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )
发送顺序消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main import ( "context" "fmt" "log" "os" "strconv" "time" "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "topic01" Endpoint = "192.168.xx.xx:8080" AccessKey = os.Getenv("ROCKETMQ_AK") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey = os.Getenv("ROCKETMQ_SK") ) func main() { os.Setenv("mq.consoleAppender.enabled", "true") golang.ResetLogger() producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, golang.WithTopics(Topic), ) if err != nil { log.Fatal(err) } err = producer.Start() if err != nil { log.Fatal(err) } defer producer.GracefulStop() for i := 0; i < 10; i++ { msg := &golang.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } // 设置消息的Key和Tag msg.SetKeys("a", "b") msg.SetTag("ab") msg.SetMessageGroup("yourMessageGroup0") resp, err := producer.Send(context.TODO(), msg) if err != nil { log.Fatal(err) } for i := 0; i < len(resp); i++ { fmt.Printf("%#v\n", resp[i]) } time.Sleep(time.Second * 1) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- Topic:输入Topic名称。
- Endpoint:输入grpc连接地址/grpc公网连接地址。
- AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
- SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
- SetKeys:输入消息的Key。
- SetTag:输入消息的Tag。
订阅顺序消息
订阅顺序消息的代码与订阅普通消息相同。