收发普通消息
本章节介绍普通消息的收发方法和示例代码。其中,普通消息发送方式分为同步发送和异步发送。
- 同步发送:消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。
- 异步发送:消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息。
以下示例仅介绍同步发送的示例代码。
收发普通消息前,请参考收集连接信息收集RocketMQ所需的连接信息。
客户端连接RocketMQ实例5.x版本收发普通消息前,需要确保Topic的消息类型为“普通”。
准备环境
- 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。
Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
如果未安装Python,请使用以下命令安装:
yum install python
- 安装librocketmq库和rocketmq-client-python,具体操作请参考rocketmq-client-python。
建议下载rocketmq-client-cpp-2.2.0,获取librocketmq库。
- 将librocketmq.so添加到系统动态库搜索路径。
- 查找librocketmq.so的路径。
find / -name librocketmq.so
- 将librocketmq.so添加到系统动态库搜索路径。
ln -s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig
- 查找librocketmq.so的路径。
同步发送
同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- topic:表示Topic名称。
- gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。
- name_srv:表示实例连接地址和端口。
订阅普通消息
参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message()
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- consumer_group:表示消费组名称。
- 192.168.0.1:8100:表示实例连接地址和端口。
- TopicTest:表示Topic名称。