收发顺序消息
顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
客户端连接RocketMQ实例5.x版本收发顺序消息前,需要确保Topic的消息类型为“顺序”。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.8。
- 使用Maven方式引入依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.8</version> </dependency>
- 下载依赖JAR包。
发送顺序消息
参考如下示例代码,或者通过Producer.java获取更多示例代码。
import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { String orderId = "order" + (i % 10); Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { String orderId = (String) arg; int index = Math.abs(orderId.hashCode() % mqs.size()); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } }}
上述代码中,相同orderId的消息需要保证顺序,不同orderId的消息不需要保证顺序,所以在分区选择算法中以“orderId/队列个数的余数”作为消息发送的队列。
订阅顺序消息
参考如下示例代码,或者通过Consumer.java获取更多示例代码。
import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); //填入连接地址 consumer.setNamesrvAddr("192.168.0.1:8100"); //consumer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 3) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }