收发顺序消息
顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
客户端连接RocketMQ实例5.x版本收发顺序消息前,需要确保Topic的消息类型为“顺序”。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.1.4。
- 使用Maven方式引入依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.4</version> </dependency> - 下载依赖JAR包。
发送顺序消息
参考如下示例代码,或者通过Producer.java获取更多示例代码。
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//填入连接地址
producer.setNamesrvAddr("192.168.0.1:8100");
//producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
String orderId = "order" + (i % 10);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode() % mqs.size());
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}}
上述代码中,相同orderId的消息需要保证顺序,不同orderId的消息不需要保证顺序,所以在分区选择算法中以“orderId/队列个数的余数”作为消息发送的队列。
订阅顺序消息
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
//填入连接地址
consumer.setNamesrvAddr("192.168.0.1:8100");
//consumer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}