更新时间:2024-03-05 GMT+08:00
收发顺序消息
顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
- 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。
全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。
收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
发送顺序消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_orderly_with_sharding_key(): producer = Producer(gid, True) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_orderly_with_sharding_key(msg, 'orderId') print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_orderly_with_sharding_key()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。
上述代码中,相同orderId的消息需要保证顺序,不同orderId的消息不需要保证顺序,所以将orderId作为选择队列的sharding key。
订阅顺序消息
只需要在订阅普通消息的代码基础上增加orderly=True,参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group', orderly=True) consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- consumer_group:表示消费组名称。
- 192.168.0.1:8100:表示实例连接地址和端口。
- TopicTest:表示Topic名称。
父主题: Python