收发事务消息
分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。
事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。
收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
客户端连接RocketMQ实例5.x版本收发事务消息前,需要确保Topic的消息类型为“事务”。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.8。
- 使用Maven方式引入依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.8</version> </dependency>
- 下载依赖JAR包。
发送事务消息
参考如下示例代码,或者通过TransactionProducer.java获取更多示例代码。
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; public class Main { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("开始执行本地事务: " + message); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("收到回查,重新查询事务状态: " + messageExt); return LocalTransactionState.COMMIT_MESSAGE; } }; TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); //填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.setTransactionListener(transactionListener); producer.start(); Message msg = new Message("TopicTest", "TagA", "KEY", "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); producer.shutdown(); }}
事务消息生产者需要实现两个回调函数,其中executeLocalTransaction回调函数在发送完半事务消息后被调用,即上图中的第3阶段,checkLocalTransaction回调函数在收到回查时调用,即上图中的第6阶段。两个回调函数均可返回3种事务状态:
- LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息。
- LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
- LocalTransactionState.UNKNOW:无法判断状态,期待服务端向生产者再次回查该消息的状态。
订阅事务消息
订阅事务消息的代码与订阅普通消息的代码相同。