更新时间:2024-05-15 GMT+08:00

收发普通消息

本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送和异步发送。

  • 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
  • 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。

以下示例仅介绍同步发送的示例代码。

收发普通消息前,请参考收集连接信息收集RocketMQ所需的连接信息。

客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。

准备环境

  1. 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    如果未安装Python,请使用以下命令安装:

    yum install python
  2. 安装librocketmq库和rocketmq-client-python,具体操作请参考rocketmq-client-python

    建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。

  3. 将librocketmq.so添加到系统动态库搜索路径。
    1. 查找librocketmq.so的路径。
      find / -name librocketmq.so
    2. 将librocketmq.so添加到系统动态库搜索路径。
      ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib
      sudo ldconfig

同步发送

同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

from rocketmq.client import Producer, Message

topic = 'TopicTest'
gid = 'test'
name_srv = '192.168.0.1:8100'


def create_message():
    msg = Message(topic)
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_property('property', 'test')
    msg.set_body('message body')
    return msg


def send_message_sync():
    producer = Producer(gid)
    producer.set_name_server_address(name_srv)
    producer.start()
    msg = create_message()
    ret = producer.send_sync(msg)
    print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()


if __name__ == '__main__':
    send_message_sync()

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • topic:表示Topic名称。
  • gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
  • name_srv:表示实例连接地址和端口。

订阅普通消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer('consumer_group')
    consumer.set_name_server_address('192.168.0.1:8100')
    consumer.subscribe('TopicTest', callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • consumer_group:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • TopicTest:表示Topic名称。