发送定时消息
分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到1年。
定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。
发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 仅RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。
- 客户端连接RocketMQ实例5.x版本收发定时消息前,需要确保Topic的消息类型为“定时”。
适用场景
定时消息适用于以下场景:
- 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
- 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。
注意事项
- 4.8.0版本定时消息的最大延迟时间为1年,5.x版本为7天,若延迟时间超过对应版本的最大限制(1年或7天),消息将发送失败。
- 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
- 无法确保定时消息仅投递一次,定时消息可能会重复投递。
- 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
- 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
- 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。
- 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
准备环境
开源的Java客户端支持连接分布式消息服务RocketMQ版,推荐使用的客户端版本为5.0.5。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
发送定时消息
发送定时消息的示例代码如下,或者通过ProducerDelayMessageExample.java获取更多示例代码。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
public class ProducerDelayMessageExample {
private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class);
private ProducerDelayMessageExample() {
}
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "yourDelayTopic";
// 填入grpc连接地址/grpc公网连接地址
String endpoints = "yourEndpoints";
// 创建实例时,如果开启了ACL才需要添加以下代码。
String accessKey = System.getenv("ACL_User_Name");
String secretKey = System.getenv("ACL_Secret_Key");
//ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。
// .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。
.build();
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
Duration messageDelayTime = Duration.ofSeconds(10);
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey")
// 设置定时消息投递时间戳
.setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 不再使用后,手动关闭producer。
producer.close();
}
}