收发顺序消息
顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者为每一条消息指定消息组,相同消息组的消息会被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发顺序消息前,需要确保Topic的消息类型为“顺序”。
- 使用gRPC协议连接RocketMQ实例时,消费者是否顺序消费消息,取决于消费组中是否开启顺序消费,并非在消费代码中设置,顺序消费消息的代码与普通消费的代码相同。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
发送顺序消息
参考如下示例代码,或者通过ProducerFifoMessageExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ProducerFifoMessageExample {
private static final Logger log = LoggerFactory.getLogger(ProducerFifoMessageExample.class);
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "yourNormalTopics";
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey")
// 指定消息组,相同消息组的消息会被分配到同一个队列。
.setMessageGroup("yourMessageGroup0")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 不再使用后,手动关闭producer。
producer.close();
}
}
订阅顺序消息
订阅顺序消息的代码与订阅普通消息相同。