更新时间:2025-10-24 GMT+08:00
Python SDK接入示例
本文介绍使用Python3 SDK通过AMQP接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境
Python 3.0及更高版本。本示例使用了Python 3.9版本。
下载SDK
本示例使用的Python语言的AMQP SDK为python-qpid-proton(本示例使用版本为0.37.0),可以通过以下命令安装最新版本SDK。
pip install python-qpid-proton
也可以参考(Installing Qpid Proton)手动安装。
代码示例
import threading
import time
from proton import SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
# 重连次数
reconnectTimes = 0
def current_time_millis():
return str(int(round(time.time() * 1000)))
class AmqpClient(MessagingHandler):
def __init__(self, host, port, accessKey, accessCode, queueName, instanceId):
super(AmqpClient, self).__init__()
self.host = host
self.port = port
self.accessKey = accessKey
self.accessCode = accessCode
self.queueName = queueName
self.instanceId = instanceId
self.reconnectFlag = threading.local().flag = False
def on_start(self, event):
# 接入域名,请参见AMQP客户端接入说明文档。
url = "amqps://%s:%s" % (self.host, self.port)
timestamp = current_time_millis()
userName = "accessKey=" + self.accessKey + "|timestamp=" + timestamp + "|instanceId=" + self.instanceId
passWord = self.accessCode
# 默认不校验服务端证书
sslDomain = SSLDomain(SSLDomain.MODE_CLIENT)
sslDomain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
# 注意: 当前已实现了重连机制, client内置重连必须关闭(reconnect=False)
self.conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60, ssl_domain=sslDomain,
reconnect=False)
event.container.create_receiver(self.conn, source=self.queueName)
# 当连接成功建立时被调用。
def on_connection_opened(self, event):
global reconnectTimes
reconnectTimes = 0
print("Connection established, remoteUrl: %s", event.connection.hostname)
# 当连接关闭时被调用。
def on_connection_closed(self, event):
print("Connection closed: %s", self)
self.reconnect()
# 当远端因错误而关闭连接时被调用。
def on_connection_error(self, event):
print("Connection error:%s", self)
self.reconnect()
# 当socket断开时调用,应对网络异常场景下远端无法发送error_condition场景
def on_disconnected(self, event):
if self.reconnectFlag:
return
print("On disconnected")
if event.transport.condition:
if event.transport.condition.info:
print("Disconnected transport error:%s: %s: %s" % (
event.transport.condition.name, event.transport.condition.description,
event.transport.condition.info))
else:
print("Disconnected transport error:%s: %s" % (
event.transport.condition.name, event.transport.condition.description))
if event.connection.condition:
if event.connection.condition.info:
print("Disconnected connection error:%s: %s: %s" % (
event.connection.condition.name, event.connection.condition.description,
event.connection.condition.info))
else:
print("Disconnected connection error:%s: %s" % (
event.connection.condition.name, event.connection.condition.description))
self.reconnect()
def reconnect(self):
if not self.reconnectFlag:
self.reconnectFlag = True
ReconnectThread("reconnectThread").start()
# 当收到消息时被调用。
def on_message(self, event):
message = event.message
content = message.body
print("receive message: content=%s" % content)
class ReconnectThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
global reconnectTimes
global connecting
connecting = True
reconnectTimes = reconnectTimes + 1
time.sleep(15 if reconnectTimes > 15 else reconnectTimes)
Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()
# 以下参数配置请参考连接配置说明
# AMQP接入域名
amqpHost = "127.0.0.1"
# AMQP接入端口
amqpPort = 5671
# 接入凭证键值
amqpAccessKey = 'your AccessKey'
# 接入凭证密钥
amqpAccessCode = 'your AccessCode'
# 订阅队列名称
amqpQueueName = 'DefaultQueue'
# 实例Id,同一Region购买多个标准版实例时需要填设置该参数。
instanceId = ''
Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()
父主题: 使用AMQP转发