Updated on 2024-05-15 GMT+08:00

Delivering Scheduled Messages

In DMS for RocketMQ, you can schedule messages to be delivered at any time, with a maximum delay of one year. You can also cancel scheduled messages.

After being sent from producers to DMS for RocketMQ, scheduled messages are delivered to consumers only after a specified point in time.

Before delivering scheduled messages, collect RocketMQ connection information by referring to Collecting Connection Information.

To receive and send scheduled messages, ensure the topic message type is Scheduled before connecting a client to a RocketMQ instance of version 5.x.

Application Scenarios

Scheduled messages can be used in the following scenarios:

  • The service logic requires a time window. For example, an e-commerce order is closed if it is not paid within a period of time. When an order is created, a scheduled message is sent and will be delivered to the consumer five minutes later. After receiving the message, the consumer checks whether the order is paid. If the order is not paid, it is closed. If the order is paid, the message is ignored.
  • A scheduled task is triggered by a message. For example, a reminder is sent to a user at a specific time.

Note

  • The delivery time can be scheduled to up to one year later. If the delay time exceeds one year, the message cannot be delivered.
  • If the delivery time is scheduled to a time point earlier than the current timestamp, the message is immediately sent to the consumer.
  • Ideally, the difference between the scheduled delivery time and the actual delivery time is smaller than 0.1s. However, if the pressure of scheduled message delivery is too high, flow control will be triggered, and the precision will deteriorate.
  • The message delivery order is not ensured for precision of 0.1s. That is, if the difference between the scheduled delivery time of two messages is smaller than 0.1s, they may not be delivered in the order that they were sent.
  • Exactly-once delivery is not guaranteed. A scheduled message may be delivered repeatedly.
  • The scheduled time is the time when the server starts to deliver a message to a consumer. If messages are stacked on the consumer, the scheduled message is delivered after the stacked messages, and cannot be delivered exactly at the configured time.
  • Due to a potential time difference between the client and server, the actual delivery time may be different from the delivery time set by the client. The server time is used.
  • Messages are retained for a period (two days by default) after the scheduled delivery time. For example, if a scheduled message is not retrieved in five days as scheduled, it is deleted on the seventh day.
  • Scheduled messages occupy about three times the storage space of normal messages. If you use a large number of scheduled messages, pay attention to the storage space usage.

Preparing the Environment

You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 5.0.5.

Using Maven
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

Delivering Scheduled Messages

Refer to the following sample code or obtain more sample code from ProducerDelayMessageExample.java.

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

public class ProducerDelayMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class);

    private ProducerDelayMessageExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourDelayTopic";
        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        Duration messageDelayTime = Duration.ofSeconds(10);
        final Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag(tag)
                .setKeys("yourMessageKey")
                // Set the delivery timestamp.
                .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }

        // Close the producer when it is no longer needed.
        producer.close();
    }
}

Canceling a Scheduled Message

The code for canceling a scheduled message is as follows:

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

public class ProducerDelayMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourDelayTopic";
        // Specify the gRPC address or gRPC public address
        String endpoints = "yourEndpoints";
        // Enter your username and key. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. The code below is required only if ACL was enabled during instance creation.
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // Disable SSL if has been enabled.
                // .setCredentialProvider(sessionCredentialsProvider)  // Set the credential provider if ACL has been enabled.
                .build();
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        try {
            // ====== Logic of scheduled delivery ======
            byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
            String tag = "yourMessageTagA";
            Duration messageDelayTime = Duration.ofSeconds(10);
            final Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setTag(tag)
                    .setKeys("yourMessageKey")
                    // Set the delivery timestamp
                    .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
                    .setBody(body)
                    .build();
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());

            // ====== Logic of cancelled delivery ======
            // Create an object for the cancellation.
            Message cancle = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setBody("cancel".getBytes(StandardCharsets.UTF_8))
                   // Set the timestamp of the message to be canceled. This timestamp must be the same as that of the scheduled delivery.
                    .setDeliveryTimestamp(message.getDeliveryTimestamp().get())
                    // Set the unique ID (UNIQUE_KEY) of the message to be canceled. The ID can be obtained from the message sending result.
                    .addProperty("__CANCEL_SCHEDULED_MSG", sendReceipt.getMessageId().toString())
                    .build();
            // Send the cancellation message before the scheduled delivery time.
            final SendReceipt cancelSendReceipt = producer.send(cancle);
            log.info("Send cancel message successfully, messageId={}", cancelSendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }

        // Close the producer when it is no longer needed.
        producer.close();
    }
}