更新时间:2025-03-07 GMT+08:00
收发普通消息
本章节介绍普通消息的收发方法和示例代码。普通消息发送方式分为同步发送和异步发送。
- 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
- 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。
收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。
准备环境
- 执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.16.5 linux/amd64
如果未安装Go,请下载并安装。
- 在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )
同步发送
同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "topic01"
Endpoint = "192.168.xx.xx:8080"
AccessKey = os.Getenv("ACL_User_Name")
SecretKey = os.Getenv("ACL_Secret_Key")
)
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
golang.ResetLogger()
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
golang.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
err = producer.Start()
if err != nil {
log.Fatal(err)
}
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
msg := &golang.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// 设置消息的Key和Tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
time.Sleep(time.Second * 1)
}
}
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- Topic:输入Topic名称。
- Endpoint:输入grpc连接地址/grpc公网连接地址。
- AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
- SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
- SetKeys:输入消息的Key。
- SetTag:输入消息的Tag。
异步发送
异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "topic01"
Endpoint = "192.168.xx.xx:8080"
AccessKey = os.Getenv("ACL_User_Name")
SecretKey = os.Getenv("ACL_Secret_Key")
)
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
golang.ResetLogger()
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
golang.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
err = producer.Start()
if err != nil {
log.Fatal(err)
}
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
msg := &golang.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// 设置消息的Key和Tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*golang.SendReceipt, err error) {
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
})
time.Sleep(time.Second * 1)
}
}
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- Topic:输入Topic名称。
- Endpoint:输入grpc连接地址/grpc公网连接地址。
- AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
- SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
- SetKeys:输入消息的Key。
- SetTag:输入消息的Tag。
订阅普通消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "topic01"
GroupName = "groupname"
Endpoint = "192.168.xx.xx:8080"
AccessKey = os.Getenv("ACL_User_Name")
SecretKey = os.Getenv("ACL_Secret_Key")
)
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
var (
// 接收消息请求的最大等待时间
awaitDuration = time.Second * 5
// 每次能接收的最大消息数
maxMessageNum int32 = 16
// 消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。
invisibleDuration = time.Second * 20
)
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
golang.ResetLogger()
simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
Endpoint: Endpoint,
Group: GroupName,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
golang.WithAwaitDuration(awaitDuration),
golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
Topic: golang.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
defer simpleConsumer.GracefulStop()
go func() {
for {
fmt.Println("start recevie message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
for _, mv := range mvs {
simpleConsumer.Ack(context.TODO(), mv)
fmt.Println(mv)
}
fmt.Println("wait a moment")
fmt.Println()
time.Sleep(time.Second * 3)
}
}()
time.Sleep(time.Minute)
}
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- Topic:输入Topic名称。
- GroupName:输入消费组名称。
- Endpoint:输入grpc连接地址/grpc公网连接地址。
- AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
- SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
父主题: Go(gRPC协议)