发送定时消息
分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到1年。
定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。
发送定时消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
约束与限制
- 2022年3月30日及以后购买的实例支持定时消息功能,在此之前购买的实例不支持此功能。
- 客户端连接RocketMQ实例5.x版本收发定时消息前,需要确保Topic的消息类型为“定时”。
适用场景
定时消息适用于以下场景:
- 消息对应的业务逻辑有时间窗口要求,如电商交易中超时未支付关闭订单的场景。在订单创建时发送一条定时消息,5分钟以后投递给消费者,消费者收到此消息后需要判断对应订单是否完成支付,如果未完成支付,则关闭订单。如果已完成,则忽略。
- 通过消息触发定时任务的场景,如在某些固定时间点向用户发送提醒消息。
注意事项
- 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。
- 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
- 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。
- 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。
- 无法确保定时消息仅投递一次,定时消息可能会重复投递。
- 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
- 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。
- 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。
- 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
准备环境
- 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
如果未安装Python,请使用以下命令安装:
yum install python
- 安装librocketmq库和rocketmq-client-python,具体操作请参考rocketmq-client-python。
建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
- 将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
发送定时消息
发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
import time from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_delay_message(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() msg.set_property('__STARTDELIVERTIME', str(int(round((time.time() + 3) * 1000)))) ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_delay_message()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。