更新时间:2024-05-15 GMT+08:00
收发事务消息
分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
客户端连接RocketMQ实例5.x版本收发事务消息前,需要确保Topic的消息类型为“事务”。
准备环境
- 执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
go version go1.16.5 linux/amd64
如果未安装Go,请下载并安装。
- 在“go.mod”中增加以下代码,添加依赖。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )
发送事务消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
package main import ( "context" "fmt" "log" "os" "strconv" "time" "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "topic01" Endpoint = "192.168.xx.xx:8080" AccessKey = os.Getenv("ROCKETMQ_AK") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey = os.Getenv("ROCKETMQ_SK") ) func main() { os.Setenv("mq.consoleAppender.enabled", "true") golang.ResetLogger() producer, err := golang.NewProducer(&golang.Config{ Endpoint: Endpoint, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, golang.WithTransactionChecker(&golang.TransactionChecker{ Check: func(msg *golang.MessageView) golang.TransactionResolution { log.Printf("check transaction message: %v", msg) // 检查本地事务并返回本地事务状态 return golang.COMMIT }, }), golang.WithTopics(Topic), ) if err != nil { log.Fatal(err) } err = producer.Start() if err != nil { log.Fatal(err) } defer producer.GracefulStop() for i := 0; i < 10; i++ { msg := &golang.Message{ Topic: Topic, Body: []byte("this is a message : " + strconv.Itoa(i)), } // 设置消息的Key和Tag msg.SetKeys("a", "b") msg.SetTag("ab") // 开启事务分支 transaction := producer.BeginTransaction() resp, err := producer.SendWithTransaction(context.TODO(), msg, transaction) if err != nil { log.Fatal(err) } for i := 0; i < len(resp); i++ { fmt.Printf("%#v\n", resp[i]) } /** * 执行本地事务,并确定本地事务结果。 * 1. 如果本地事务提交成功,则提交消息事务。 * 2. 如果本地事务提交失败,则回滚消息事务。 * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。 * */ err = transaction.Commit() if err != nil { log.Fatal(err) } time.Sleep(time.Second * 1) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- Topic:输入Topic名称。
- Endpoint:输入grpc连接地址/grpc公网连接地址。
- AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
- SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
- SetKeys:输入消息的Key。
- SetTag:输入消息的Tag。
事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态,可以返回3种事务状态:
- TransactionResolution.COMMIT:提交事务,允许消费者消费该消息。
- TransactionResolution.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
- TransactionResolution.UNKNOW:无法判断状态,期待服务端向生产者再次回查该消息的状态。
订阅事务消息
订阅事务消息的代码与订阅普通消息相同。
父主题: Go(gRPC协议)