Sending and Receiving Ordered Messages
In DMS for RocketMQ, ordered messages are retrieved in the exact order that they are created.
Ordered messages are ordered globally or on the partition level.
- Globally ordered messages: There is only one queue in a specific topic. All messages in the queue will be published and subscribed to in the first in, first out (FIFO) order.
- Partition-level ordered message: Messages within a queue in a specific topic are published and subscribed to in the FIFO order. The producer specifies a partition selection algorithm to ensure that the messages to be ordered are allocated to the same queue.
The only difference between globally ordered messages and partition-level ordered messages is the number of queues. The code is the same.
Before sending and receiving ordered messages, collect RocketMQ connection information by referring to Collecting Connection Information.
Preparing the Environment
You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 4.9.8.
- Using Maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.8</version> </dependency>
- Downloading the dependency.
Sending Ordered Messages
Refer to the following sample code or obtain more sample code from Producer.java.
import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Enter the metadata address. producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation. producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { String orderId = "order" + (i % 10); Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { String orderId = (String) arg; int index = Math.abs(orderId.hashCode() % mqs.size()); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } }}
In the preceding code, only the sequence of messages with the same orderId must be kept unchanged. Therefore, in the partition selection algorithm, specify the remainder of the value of orderId divided by the number of queues as the queue where messages are sent.
Subscribing to Ordered Messages
Refer to the following sample code or obtain more sample code from Consumer.java.
import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); // Enter the metadata address. consumer.setNamesrvAddr("192.168.0.1:8100"); //consumer.setUseTLS(true); // Add this line if SSL has been enabled during instance creation. consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 3) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.