Delivering Scheduled Messages
In DMS for RocketMQ, you can schedule messages to be delivered at any time, with a maximum delay of one year.
After being sent from producers to DMS for RocketMQ, scheduled messages are delivered to consumers only after a specified point in time.
Before delivering scheduled messages, collect RocketMQ connection information by referring to Collecting Connection Information.
Notes and Constraints
- The gRPC protocol is only supported by RocketMQ v5.x but not v4.8.0.
- To receive and send scheduled messages, ensure the topic message type is Scheduled before connecting a client to a RocketMQ instance of v5.x.
Application Scenarios
Scheduled messages can be used in the following scenarios:
- The service logic requires a time window. For example, an e-commerce order is closed if it is not paid within a period of time. When an order is created, a scheduled message is sent and will be delivered to the consumer five minutes later. After receiving the message, the consumer checks whether the order is paid. If the order is not paid, it is closed. If the order is paid, the message is ignored.
- A scheduled task is triggered by a message. For example, a reminder is sent to a user at a specific time.
Note
- The delivery time can be scheduled to one year later for v4.8.0 or seven days later for v5.x. If the delay time exceeds, the message cannot be delivered.
- If the delivery time is scheduled to a time point earlier than the current timestamp, the message is immediately sent to the consumer.
- Exactly-once delivery is not guaranteed. A scheduled message may be delivered repeatedly.
- The scheduled time is the time when the server starts to deliver a message to a consumer. If messages are stacked on the consumer, the scheduled message is delivered after the stacked messages, and cannot be delivered exactly at the configured time.
- Due to a potential time difference between the client and server, the actual delivery time may be different from the delivery time set by the client. The server time is used.
- Messages are retained for a period (two days by default) after the scheduled delivery time. For example, if a scheduled message is not consumed in five days as scheduled, it is deleted on the seventh day.
- Scheduled messages occupy about three times the storage space of normal messages. If you use a large number of scheduled messages, pay attention to the storage space usage.
Preparing the Environment
You can connect open-source Java clients to DMS for RocketMQ. The recommended Java client version is 5.0.5.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
Delivering Scheduled Messages
The following code is an example. Replace the information in bold with the actual values.
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "topic01"
Endpoint = "192.168.xx.xx:8080"
AccessKey = os.Getenv("ACL_User_Name")
SecretKey = os.Getenv("ACL_Secret_Key")
)
// ACL_User_Name is the username and ACL_Secret_Key is the key. For details about how to create a user, see Creating a User. Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
golang.ResetLogger()
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
golang.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
err = producer.Start()
if err != nil {
log.Fatal(err)
}
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
msg := &golang.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// Set the message key and tag.
msg.SetKeys("a", "b")
msg.SetTag("ab")
// Set the schedule timestamp.
msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
time.Sleep(time.Second * 1)
}
}
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- Topic: Enter a topic name.
- Endpoint: Enter the gRPC address or gRPC public address.
- AccessKey: Enter the username if ACL was enabled during instance creation.
- SecretKey: Enter the user key if ACL was enabled during instance creation.
- SetKeys: Enter the message key.
- SetTag: Enter the message tag.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.