更新时间:2023-12-22 GMT+08:00

收发顺序消息

顺序消息是分布式消息服务RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。

顺序消息分为全局顺序消息和分区顺序消息:

  • 全局顺序消息:对于指定的一个Topic,将队列数量设置为1,这个队列内所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和订阅。
  • 分区顺序消息:对于指定的一个Topic,同一个队列内的消息按照严格的FIFO顺序进行发布和订阅。生产者指定分区选择算法,保证需要按顺序消费的消息被分配到同一个队列。

全局顺序消息和分区顺序消息的区别仅为队列数量不同,代码没有区别。

收发顺序消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

发送顺序消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

from rocketmq.client import Producer, Message

topic = 'TopicTest'
gid = 'test'
name_srv = '192.168.0.1:8100'


def create_message():
    msg = Message(topic)
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_property('property', 'test')
    msg.set_body('message body')
    return msg


def send_orderly_with_sharding_key():
    producer = Producer(gid, True)
    producer.set_name_server_address(name_srv)
    producer.start()
    msg = create_message()
    ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
    print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()


if __name__ == '__main__':
    send_orderly_with_sharding_key()

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • topic:表示Topic名称。
  • gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
  • name_srv:表示实例连接地址和端口。

上述代码中,相同orderId的消息需要保证顺序,不同orderId的消息不需要保证顺序,所以将orderId作为选择队列的sharding key。

订阅顺序消息

只需要在订阅普通消息的代码基础上增加orderly=True,参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group', orderly=True)
    consumer.set_name_server_address('192.168.0.1:8100')
    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • consumer_group:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • TopicTest:表示Topic名称。