更新时间:2024-06-11 GMT+08:00

Python SDK接入示例

本文介绍使用Python3 SDK通过AMQP接入华为云物联网平台,接收服务端订阅消息的示例。

开发环境

Python 3.0及更高版本。本示例使用了Python 3.9版本。

下载SDK

本示例使用的Python语言的AMQP SDK为python-qpid-proton(本示例使用版本为0.37.0),可以通过以下命令安装最新版本SDK。

pip install python-qpid-proton

也可以参考(Installing Qpid Proton)手动安装。

代码示例

import threading
import time

from proton import SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container

# 重连次数
reconnectTimes = 0


def current_time_millis():
    return str(int(round(time.time() * 1000)))


class AmqpClient(MessagingHandler):
    def __init__(self, host, port, accessKey, accessCode, queueName, instanceId):
        super(AmqpClient, self).__init__()
        self.host = host
        self.port = port
        self.accessKey = accessKey
        self.accessCode = accessCode
        self.queueName = queueName
        self.instanceId = instanceId

    def on_start(self, event):
        # 接入域名,请参见AMQP客户端接入说明文档。
        url = "amqps://%s:%s" % (self.host, self.port)

        timestamp = current_time_millis()
        userName = "accessKey=" + self.accessKey + "|timestamp=" + timestamp + "|instanceId=" + self.instanceId
        passWord = self.accessCode
        # 默认不校验服务端证书
        sslDomain = SSLDomain(SSLDomain.MODE_CLIENT)
        sslDomain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
        self.conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60, ssl_domain=sslDomain,
                                            reconnect=False)
        event.container.create_receiver(self.conn, source=self.queueName)

    # 当连接成功建立时被调用。
    def on_connection_opened(self, event):
        global reconnectTimes
        reconnectTimes = 0
        print("Connection established, remoteUrl: %s", event.connection.hostname)

    # 当连接关闭时被调用。
    def on_connection_closed(self, event):
        print("Connection closed: %s", self)
        ReconnectThread("reconnectThread").start()

    # 当远端因错误而关闭连接时被调用。
    def on_connection_error(self, event):
        print("Connection error:%s", self)
        ReconnectThread("reconnectThread").start()

    # 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。
    def on_transport_error(self, event):
        if event.transport.condition:
            if event.transport.condition.info:
                print("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description,
                                      event.transport.condition.info))
            else:
                print("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
        else:
            print("Unspecified transport error")
        ReconnectThread("reconnectThread").start()

    # 当收到消息时被调用。
    def on_message(self, event):
        message = event.message
        content = message.body
        print("receive message: content=%s" % content)


class ReconnectThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        global reconnectTimes
        reconnectTimes = reconnectTimes + 1
        time.sleep(15 if reconnectTimes > 15 else reconnectTimes)
        Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()


# 以下参数配置请参考连接配置说明
# AMQP接入域名
amqpHost = "127.0.0.1"

# AMQP接入端口
amqpPort = 5671

# 接入凭证键值
amqpAccessKey = 'your AccessKey'

# 接入凭证密钥
amqpAccessCode = 'your AccessCode'

# 订阅队列名称
amqpQueueName = 'DefaultQueue'

# 实例Id,同一Region购买多个标准版实例时需要填设置该参数。
instanceId = ''

Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()