更新时间:2024-03-05 GMT+08:00

收发事务消息

分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。

图1 事务消息交互流程

事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。

收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.8

通过以下任意一种方式引入依赖:
  • 使用Maven方式引入依赖。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.8</version>
    </dependency>
  • 下载依赖JAR包

发送事务消息

参考如下示例代码,或者通过TransactionProducer.java获取更多示例代码。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;

public class Main {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("开始执行本地事务: " + message);
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("收到回查,重新查询事务状态: " + messageExt);
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //填入连接地址
        producer.setNamesrvAddr("192.168.0.1:8100");
        //producer.setUseTLS(true);    //创建实例时,如果开启了SSL,请增加此行代码。
        producer.setTransactionListener(transactionListener);
        producer.start();

        Message msg =
            new Message("TopicTest", "TagA", "KEY",
                "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }}

事务消息生产者需要实现两个回调函数,其中executeLocalTransaction回调函数在发送完半事务消息后被调用,即上图中的第3阶段,checkLocalTransaction回调函数在收到回查时调用,即上图中的第6阶段。两个回调函数均可返回3种事务状态:

  • LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息。
  • LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
  • LocalTransactionState.UNKNOW:无法判断状态,期待服务端向生产者再次回查该消息的状态。

订阅事务消息

订阅事务消息的代码与订阅普通消息的代码相同。