更新时间:2024-11-05 GMT+08:00
分享

Python客户端使用说明

本文以Linux CentOS环境为例,介绍Python版本的RabbitMQ客户端连接指导,包括RabbitMQ客户端安装,以及生产、消费消息。

使用前请参考收集连接信息收集RabbitMQ所需的连接信息。

本文的连接示例对于RabbitMQ 3.x.x版本与AMQP-0-9-1版本都适用。

准备环境

  • Python

    一般系统预装了Python,您可以在命令行输入python或者python3,查看Python是否已经安装。python命令只能查询Python 2.x版本,python3命令只能查询Python 3.x版本,如果无法确认Python版本,请分别输入两个命令查看结果。

    以Python 3.x为例,得到如下回显,说明Python已安装。

    [root@ecs-test python]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    如果未安装Python,请使用以下命令安装:

    yum install python

  • Python版的RabbitMQ客户端,本文使用pika作为连接RabbitMQ的客户端

    执行以下命令,安装推荐版本的pika:

    pip install pika

    如果无法使用pip命令安装pika,建议改用pip3命令安装pika:

    pip3 install pika

生产消息

以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

  • SSL认证方式
    import pika
    import ssl
    
    # 连接信息
    conf = {
        'host': 'ip',
        'port': 5671,
        'queue_name': 'queue-test',
        'username': 'root',
        'password': 'password'
    }
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    credentials = pika.PlainCredentials(conf['username'], conf['password'])
    parameters = pika.ConnectionParameters(conf['host'],
                                           conf['port'],
                                           '/',
                                           credentials,
                                           ssl_options=pika.SSLOptions(context))
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(conf['queue_name'])
    data = bytes('Hello World!', encoding='utf-8')
    channel.basic_publish(exchange='',
                          routing_key=conf['queue_name'],
                          body=data)
    
    print(" [x] Sent 'Hello World!'")
    
    connection.close()
  • 非SSL认证方式
    import pika
    
    # 连接信息
    conf = {
        'host': 'ip',
        'port': 5672,
        'queue_name': 'queue-test',
        'username': 'root',
        'password': 'password'
    }
    
    credentials = pika.PlainCredentials(conf['username'], conf['password'])
    parameters = pika.ConnectionParameters(conf['host'],
                                           conf['port'],
                                           '/',
                                           credentials)
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    channel.queue_declare(conf['queue_name'])
    
    data = bytes("Hello World!", encoding="utf-8")
    
    channel.basic_publish(exchange='',
                          routing_key=conf['queue_name'],
                          body=data)
    
    print(" [x] Sent 'Hello World!'")
    
    connection.close()

消费消息

以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

  • SSL认证方式
    import pika
    import ssl
    
    # 连接信息
    conf = {
        'host': 'ip',
        'port': 5671,
        'queue_name': 'queue-test',
        'username': 'root',
        'password': 'password'
    }
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    credentials = pika.PlainCredentials(conf['username'], conf['password'])
    parameters = pika.ConnectionParameters(conf['host'],
                                           conf['port'],
                                           '/',
                                           credentials,
                                           ssl_options=pika.SSLOptions(context))
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(conf['queue_name'])
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode('utf-8'))
    
    
    channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
  • 非SSL认证方式
    import pika
    
    # 连接信息
    conf = {
        'host': 'ip',
        'port': 5672,
        'queue_name': 'queue-test',
        'username': 'root',
        'password': 'password'
    }
    
    credentials = pika.PlainCredentials(conf['username'], conf['password'])
    parameters = pika.ConnectionParameters(conf['host'],
                                           conf['port'],
                                           '/',
                                           credentials)
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(conf['queue_name'])
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode('utf-8'))
    
    
    channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

相关文档