更新时间:2024-03-05 GMT+08:00

发送定时消息

分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到1年。同时也支持定时消息的取消。

定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。

发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

  • 2022年3月30日及以后购买的实例支持定时消息功能,在此之前购买的实例不支持此功能。
  • 客户端连接RocketMQ实例5.x版本收发定时消息前,需要确保Topic的消息类型为“定时”。

适用场景

定时消息适用于以下场景:

  • 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
  • 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。

注意事项

  • 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。
  • 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
  • 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。
  • 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。
  • 无法确保定时消息仅投递一次,定时消息可能会重复投递。
  • 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
  • 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
  • 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。
  • 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为4.9.8

通过以下任意一种方式引入依赖:
  • 使用Maven方式引入依赖。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.8</version>
    </dependency>
  • 下载依赖JAR包

发送定时消息

发送定时消息的示例代码如下:

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;


public class ScheduledMessageProducer1 {
    public static final String TOPIC_NAME = "ScheduledTopic";


    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException {


        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 填入连接地址
        producer.setNamesrvAddr("192.168.0.1:8100");
        //producer.setUseTLS(true);    //创建实例时,如果开启了SSL,请增加此行代码。
        producer.start();


        // 定时消息投递时间戳,该消息10秒后投递
        final long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli();
        // 创建消息对象
        Message msg = new Message(TOPIC_NAME,
            "TagA",
            "KEY",
            "scheduled message".getBytes(StandardCharsets.UTF_8));
        // 设置消息定时投递的时间戳属性
        msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
        // 发送消息,该消息将会在10秒后投递
        SendResult sendResult = producer.send(msg);
        // 打印发送结果和预计投递时间
        System.out.printf("%s %s%n", sendResult, UtilAll.timeMillisToHumanString2(deliverTimestamp));


        producer.shutdown();
    }
}

取消定时消息

取消定时消息的示例代码如下:

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class ScheduledMessageProducer1 {
    public static final String TOPIC_NAME = "ScheduledTopic";

    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 填入连接地址
        producer.setNamesrvAddr("192.168.0.1:8100");
        //producer.setUseTLS(true);    //创建实例时,如果开启了SSL,请增加此行代码。
        producer.start();

        // 定时消息投递时间戳,该消息10秒后投递
        final long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli();
        // 创建消息对象
        Message msg = new Message(TOPIC_NAME,
            "TagA",
            "KEY",
            "scheduled message".getBytes(StandardCharsets.UTF_8));
        // 设置消息定时投递的时间戳属性
        msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
        // 发送消息,该消息将会在10秒后投递
        SendResult sendResult = producer.send(msg);
        // 打印发送结果和预计投递时间
        System.out.printf("%s %s%n", sendResult, UtilAll.timeMillisToHumanString2(deliverTimestamp));

        // ====== 发送取消消息逻辑 ======

        // 创建取消消息对象
        Message cancelMsg = new Message(TOPIC_NAME,
            "",
            "",
            "cancel".getBytes(StandardCharsets.UTF_8));
        // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致
        cancelMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
        // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取
        cancelMsg.putUserProperty("__CANCEL_SCHEDULED_MSG", sendResult.getMsgId());
        // 发送取消消息,必须在定时消息被投递之前发送才可以取消
        SendResult cancelSendResult = producer.send(cancelMsg, sendResult.getMessageQueue());
        System.out.printf("cancel %s%n", cancelSendResult);

        producer.shutdown();    }}