更新时间:2024-05-15 GMT+08:00

发送定时消息

分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到1年。

定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。

发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

客户端连接RocketMQ实例5.x版本收发定时消息前,需要确保Topic的消息类型为“定时”。

适用场景

定时消息适用于以下场景:

  • 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
  • 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。

注意事项

  • 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。
  • 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
  • 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。
  • 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。
  • 无法确保定时消息仅投递一次,定时消息可能会重复投递。
  • 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
  • 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
  • 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。
  • 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。

准备环境

开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5

使用Maven方式引入依赖。
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.5</version>
</dependency>

发送定时消息

发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/apache/rocketmq-clients/golang"
    "github.com/apache/rocketmq-clients/golang/credentials"
)

const (
    Topic     = "topic01"
    Endpoint  = "192.168.xx.xx:8080"
    AccessKey = os.Getenv("ROCKETMQ_AK")  //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    SecretKey = os.Getenv("ROCKETMQ_SK")
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    golang.ResetLogger()
    producer, err := golang.NewProducer(&golang.Config{
        Endpoint: Endpoint,
        Credentials: &credentials.SessionCredentials{
            AccessKey:    AccessKey,
            AccessSecret: SecretKey,
        },
    },
        golang.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    defer producer.GracefulStop()
    for i := 0; i < 10; i++ {
        msg := &golang.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // 消息的Key和Tag
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        // 设置定时消息投递时间戳
        msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))
        // send message in sync
        resp, err := producer.Send(context.TODO(), msg)
        if err != nil {
            log.Fatal(err)
        }
        for i := 0; i < len(resp); i++ {
            fmt.Printf("%#v\n", resp[i])
        }

        time.Sleep(time.Second * 1)
    }
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • Topic:输入Topic名称。
  • Endpoint:输入grpc连接地址/grpc公网连接地址。
  • AccessKey:创建实例时,如果开启了ACL,需要输入用户名。
  • SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
  • SetKeys:输入消息的Key。
  • SetTag:输入消息的Tag。