Python客户端使用说明
本文以Linux CentOS环境为例,介绍Python版本的RabbitMQ客户端连接指导,包括RabbitMQ客户端安装,以及生产、消费消息。
使用前请参考收集连接信息收集RabbitMQ所需的连接信息。
本文的连接示例对于RabbitMQ 3.x.x版本与AMQP-0-9-1版本都适用。
准备环境
- Python
一般系统预装了Python,您可以在命令行输入python或者python3,查看Python是否已经安装。python命令只能查询Python 2.x版本,python3命令只能查询Python 3.x版本,如果无法确认Python版本,请分别输入两个命令查看结果。
以Python 3.x为例,得到如下回显,说明Python已安装。
[root@ecs-test python]# python3 Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
如果未安装Python,请使用以下命令安装:
yum install python
- Python版的RabbitMQ客户端,本文使用pika作为连接RabbitMQ的客户端
pip install pika
如果无法使用pip命令安装pika,建议改用pip3命令安装pika:
pip3 install pika
生产消息
以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
- SSL认证方式
import pika import ssl # 连接信息 conf = { 'host': 'ip', 'port': 5671, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password' } context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) credentials = pika.PlainCredentials(conf['username'], conf['password']) parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials, ssl_options=pika.SSLOptions(context)) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(conf['queue_name']) data = bytes('Hello World!', encoding='utf-8') channel.basic_publish(exchange='', routing_key=conf['queue_name'], body=data) print(" [x] Sent 'Hello World!'") connection.close()
- 非SSL认证方式
import pika # 连接信息 conf = { 'host': 'ip', 'port': 5672, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password' } credentials = pika.PlainCredentials(conf['username'], conf['password']) parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(conf['queue_name']) data = bytes("Hello World!", encoding="utf-8") channel.basic_publish(exchange='', routing_key=conf['queue_name'], body=data) print(" [x] Sent 'Hello World!'") connection.close()
消费消息
以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
- SSL认证方式
import pika import ssl # 连接信息 conf = { 'host': 'ip', 'port': 5671, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password' } context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) credentials = pika.PlainCredentials(conf['username'], conf['password']) parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials, ssl_options=pika.SSLOptions(context)) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(conf['queue_name']) def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode('utf-8')) channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
- 非SSL认证方式
import pika # 连接信息 conf = { 'host': 'ip', 'port': 5672, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password' } credentials = pika.PlainCredentials(conf['username'], conf['password']) parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(conf['queue_name']) def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode('utf-8')) channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()