设备接入 IoTDA
设备接入 IoTDA
- 最新动态
- 功能总览
- 服务公告
- 计费说明
- 产品介绍
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
-
API参考
- 应用侧API参考
- 设备侧MQTT/MQTTS接口参考
- 设备侧HTTPS接口参考
- 设备侧LwM2M接口参考
- 安全隧道WebSocket接口参考
- 模组AT指令参考
- 修订记录
- SDK参考
- 场景代码示例
-
常见问题
- 热门问题
-
方案咨询
- 连接IoT平台的业务场景有哪些?
- 设备管理服务和设备接入服务合一后的差异点是什么?
- IAM子用户或子项目是否可以开通物联网平台服务?
- 物联网平台支持在华为云的哪些区域开通?
- 华为是否提供模组/硬件终端/应用软件等?
- IAM用户访问API提示没有权限?(是否区分版本?)
- 创建规则或者设置资源文件存储时候提示赋予Security Administrator权限
- 物联网平台设置默认资源空间的规则是什么?
- 设备接入服务如何获取设备数据?
- 物联网平台的资源空间和设备可以无限创建吗?
- 物联网平台支持批量注册设备吗?
- 物联网平台对应用侧和设备侧在开发或使用时有限制吗?
- 物联网平台支持的DTLS加密算法有哪些?
- 物联网平台支持二进制大小端模式切换吗?
- 什么是NB-IoT?
- 物联网平台支持的硬件架构和使用的相关组件有哪些?
- 如何获取平台接入地址?
- 设备集成相关问题
- 设备侧SDK相关问题
- 设备发放相关问题
- LWM2M/CoAP接入相关问题
- MQTT接入相关问题
- 泛协议接入相关问题
- 物模型相关问题
- 消息通信相关问题
- 订阅推送相关问题
- 编解码插件相关问题
- OTA升级相关问题
- 应用集成相关问题
- 实例管理相关问题
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
Python SDK接入示例
本文介绍使用Python3 SDK通过AMQP接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境
Python 3.0及更高版本。本示例使用了Python 3.9版本。
下载SDK
本示例使用的Python语言的AMQP SDK为python-qpid-proton(本示例使用版本为0.37.0),可以通过以下命令安装最新版本SDK。
pip install python-qpid-proton
也可以参考(Installing Qpid Proton)手动安装。
代码示例
import threading import time from proton import SSLDomain from proton.handlers import MessagingHandler from proton.reactor import Container # 重连次数 reconnectTimes = 0 def current_time_millis(): return str(int(round(time.time() * 1000))) class AmqpClient(MessagingHandler): def __init__(self, host, port, accessKey, accessCode, queueName, instanceId): super(AmqpClient, self).__init__() self.host = host self.port = port self.accessKey = accessKey self.accessCode = accessCode self.queueName = queueName self.instanceId = instanceId def on_start(self, event): # 接入域名,请参见AMQP客户端接入说明文档。 url = "amqps://%s:%s" % (self.host, self.port) timestamp = current_time_millis() userName = "accessKey=" + self.accessKey + "|timestamp=" + timestamp + "|instanceId=" + self.instanceId passWord = self.accessCode # 默认不校验服务端证书 sslDomain = SSLDomain(SSLDomain.MODE_CLIENT) sslDomain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER) self.conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60, ssl_domain=sslDomain, reconnect=False) event.container.create_receiver(self.conn, source=self.queueName) # 当连接成功建立时被调用。 def on_connection_opened(self, event): global reconnectTimes reconnectTimes = 0 print("Connection established, remoteUrl: %s", event.connection.hostname) # 当连接关闭时被调用。 def on_connection_closed(self, event): print("Connection closed: %s", self) ReconnectThread("reconnectThread").start() # 当远端因错误而关闭连接时被调用。 def on_connection_error(self, event): print("Connection error:%s", self) ReconnectThread("reconnectThread").start() # 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。 def on_transport_error(self, event): if event.transport.condition: if event.transport.condition.info: print("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) else: print("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) else: print("Unspecified transport error") ReconnectThread("reconnectThread").start() # 当收到消息时被调用。 def on_message(self, event): message = event.message content = message.body print("receive message: content=%s" % content) class ReconnectThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): global reconnectTimes reconnectTimes = reconnectTimes + 1 time.sleep(15 if reconnectTimes > 15 else reconnectTimes) Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run() # 以下参数配置请参考连接配置说明 # AMQP接入域名 amqpHost = "127.0.0.1" # AMQP接入端口 amqpPort = 5671 # 接入凭证键值 amqpAccessKey = 'your AccessKey' # 接入凭证密钥 amqpAccessCode = 'your AccessCode' # 订阅队列名称 amqpQueueName = 'DefaultQueue' # 实例Id,同一Region购买多个标准版实例时需要填设置该参数。 instanceId = '' Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()
父主题: 使用AMQP转发