更新时间:2024-03-05 GMT+08:00
分享

收发事务消息

分布式消息服务RocketMQ版的事务消息支持在业务逻辑与发送消息之间提供事务保证,通过两阶段的方式提供对事务消息的支持,事务消息交互流程如图1所示。

图1 事务消息交互流程

事务消息生产者首先发送半消息,然后执行本地事务。如果执行成功,则发送事务提交,否则发送事务回滚。服务端在一段时间后如果一直收不到提交或回滚,则发起回查,生产者在收到回查后重新发送事务提交或回滚。消息只有在提交之后才投递给消费者,消费者对回滚的消息不可见。

收发事务消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

客户端连接RocketMQ实例5.x版本收发事务消息前,需要确保Topic的消息类型为“事务”。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5

使用Maven方式引入依赖。
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

发送事务消息

参考如下示例代码,或者通过ProducerTransactionMessageExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ProducerTransactionMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourTransactionTopic";
        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密码,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果开启了SSL,请增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();

        TransactionChecker checker = messageView -> {
            log.info("Receive transactional message check, message={}", messageView);
            // Return the transaction resolution according to your business logic.
            // 检查本地事务并返回本地事务状态
            return TransactionResolution.COMMIT;
        };
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                // 事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
                .setTransactionChecker(checker)
                .build();
        byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                .setBody(body)
                .build();
        // 开启事务分支
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            log.error("Failed to begin transaction", e);
            //事务分支开启失败,直接退出。
            return;
        }
        try {
            final SendReceipt sendReceipt = producer.send(message, transaction);
            log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
            return;
        }
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         *
         */
        transaction.commit();
        // transaction.rollback();

        // 不再使用后,手动关闭producer。
        producer.close();
    }
}

事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态,可以返回3种事务状态:

  • TransactionResolution.COMMIT:提交事务,允许消费者消费该消息。
  • TransactionResolution.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
  • TransactionResolution.UNKNOW:无法判断状态,期待服务端向生产者再次回查该消息的状态。

订阅事务消息

订阅事务消息的代码与订阅普通消息相同。

分享:

    相关文档

    相关产品