更新时间:2024-08-29 GMT+08:00

收发顺序消息

顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。

顺序消息分为全局顺序消息和分区顺序消息:

  • 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
  • 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者为每一条消息指定消息组,相同消息组的消息会被分配到同一个队列。

全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。

收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

  • 客户端连接RocketMQ实例5.x版本收发顺序消息前,需要确保Topic的消息类型为“顺序”。
  • 使用gRPC协议连接RocketMQ实例时,消费者是否顺序消费消息,取决于消费组中是否开启顺序消费,并非在消费代码中设置,顺序消费消息的代码与普通消费的代码相同。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5

使用Maven方式引入依赖。
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

发送顺序消息

参考如下示例代码,或者通过ProducerFifoMessageExample.java获取更多示例代码。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;


public class ProducerFifoMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerFifoMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourNormalTopics";
        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果开启了SSL,请增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();

        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                // 指定消息组,相同消息组的消息会被分配到同一个队列。
                .setMessageGroup("yourMessageGroup0")
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }

        // 不再使用后,手动关闭producer。
        producer.close();
    }
}

订阅顺序消息

订阅顺序消息的代码与订阅普通消息相同。