设备自定义TOPIC迁移上云
场景说明
随着IoT终端设备规模不断增加,企业自建物联网面临服务器扩容,硬件投入和运维成本越来越高等诸多问题。为此,华为云物联网平台提供设备极简迁移上云解决方案,企业设备通过极少的改动,就可以快速将设备迁移到华为云物联网平台。
整体方案
假设企业终端设备接入自建MQTT集群,业务架构如下图所示。
基于MQTT协议的上行数据和下行指令的业务定义如下:
业务场景 |
通信Topic |
报文Payload |
---|---|---|
设备上报数据 |
/aircondition/data/up |
{ "temperature": 26.0 } |
服务端控制指令 |
/aircondition/cmd |
{ "switch": "off" } |
为减少企业改造成本,华为云物联网平台提供如下迁移方案,设备侧不用改变原有的topic和payload报文格式,就可以快速迁移设备到华为云物联网平台。
企业设备迁移上云有三个核心变更点:
云端配置开发
- 访问设备接入服务,单击“立即使用”进入设备接入控制台。
- 创建产品。
- 选择左侧导航栏的“产品”。
- 单击左侧的“创建产品”,创建待迁移设备的产品,填写参数后,单击“确定”。
基本信息
产品名称
自定义,如aircondition
协议类型
选择“MQTT”
数据格式
选择“JSON”
所属行业
无
设备类型
根据实际情况填写,如“aircondition”。
- 注册设备。
- 选择左侧导航栏的图1 单设备注册-aircondition
参数名称
说明
所属产品
选择在步骤2中创建的产品。
设备标识码
即node_id,填写为设备的IMEI、MAC地址或Serial No;若没有真实设备,填写自定义字符串,由英文字母和数字组成。
设备名称
自定义。
设备认证类型
根据企业设备现有的认证类型,选择对应的认证方式。本文以“密钥”方式为例。
密钥
设备密钥。填写待迁移设备的密钥。
,单击页面右上角的“注册设备”。
- 填写设备参数后,单击“确定”。
- 选择左侧导航栏的
- 创建数据转发规则。
- 选择左侧导航栏的“创建规则”。 ,单击页面左测的
- 参考下表填写参数后,单击“创建规则”。
参数名
参数说明
规则名称
创建的规则名称。
规则描述
对该规则的描述。
数据来源
选择“设备消息”。设备迁移上云时,设备按原有的topic和payload进行上报,物联网平台默认按设备消息流程进行处理。
触发事件
选择数据来源后,自动匹配触发事件。
资源空间
您可以选择单个资源空间或所有资源空间。
- 在设置转发目标页面,单击“添加”,在弹出的页面中参考下表配置完参数后,单击“确认”。
参数名
参数说明
转发目标
选择数据转发目标,本文以“AMQP推送消息队列”为例。
消息队列
单击“选择”,选择消息队列。
- 若没有消息队列,请新建消息队列,队列名称自定义且单个租户名下唯一,长度8-128,只能包含大写字母、小写字母、数字和指定特殊字符(如_-.:)。
- 若需要删除消息队列,单击消息队列右侧的“删除”即可。
说明:
已经订阅的队列不允许删除。
- 完成完整的规则定义后,单击“启动规则”,实现数据转发至AMQP消息队列。
设备端开发
完成云端配置后,需要进行设备端业务开发。完整的设备开发流程可参考设备侧开发。本章节以MQTT.fx为例,介绍在设备迁移场景下,设备侧如何在尽量少改动的情况下,实现设备建立MQTT连接、数据上报、指令接收等功能。
- 设备同物联网平台建立MQTT连接。
- 参考下表配置鉴权参数。
参数
必选/可选
参数描述
Broker Address
必选
华为云物联网平台的MQTT协议接入地址,请参考此处获取。
Broker Port
必选
Client ID
可选
使用设备迁移前的Client ID。
User Name
必选
填写步骤3注册设备时生成的设备ID,默认通过控制台生成的设备ID会添加产品ID前缀。在设备迁移场景,设备侧User Name参数无法修改时可以调用创建设备接口,指定设备ID参数值同迁移前的User Name参数值保持一致。
Password
必选
加密后的设备密钥。Password的值为使用“HMACSHA256”算法以时间戳为密钥,对secret进行加密后的值。
secret为注册设备时平台返回的secret。
- 参考下表配置“SSL/TLS”认证参数,然后单击“Apply”。
- 参考下表配置鉴权参数。
- 设备同物联网平台建立连接后,设备沿用迁移前的topic和payload格式上报数据。物联网平台针对这类非系统预定义的topic,统一按照“设备消息”的处理流程将设备上报的数据转发给第三方应用或者华为云的其他云服务处理。
图2 设备在线
- 根据迁移前的topic进行订阅,接收应用服务器下发的指令。
服务端开发
对服务端进行业务开发,实现接收设备数据和下发控制指令。本文以Java脚本为例,演示接收设备数据和下发控制指令。
- 业务服务器接收设备数据
服务器通过AMQP客户端接收消息,详细说明请参见AMQP客户端接入说明,Java SDK接入示例。设备迁移场景AMQP收到消息接口参考设备消息上报通知。
AMQP客户端收到的消息样例,body字段携带设备上报的原始topic和payload。
{ "resource": "device.message", "event": "report", "event_time": "20201114T034403Z", "notify_data": { "header": { "app_id": "QAksSBSBBQpWYtEKC3LrrOboNk0a", "device_id": "5fae45e358115902ce609882_20201113", "node_id": "20201113", "product_id": "5fae45e358115902ce609882", "gateway_id": "5fae45e358115902ce609882_20201113" }, "body": { "topic": "/aircondition/data/up", "content": { "temperature": 26.0 } } } }
JAVA核心代码样例如下:
package com.huawei.iot.amqp.jms; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.net.URI; import java.util.Hashtable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class HwIotAmqpJavaClientDemo{ //异步线程池,参数可以根据业务特点作调整,也可以用其他异步方式来处理。 private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000)); public static void main(String[] args) throws Exception{ //连接凭证接入键值。 String accessKey = "${yourAccessKey}"; long timeStamp = System.currentTimeMillis(); //UserName组装方法,请参见文档:AMQP客户端接入说明。 String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp; //连接凭证接入码。 String password = "${yourAccessCode}"; //按照qpid-jms的规范,组装连接URL。 String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN"; Hashtable<String, String> hashtable = new Hashtable<>(); hashtable.put("connectionfactory.HwConnectionURL", connectionUrl); //队列名,可以使用默认队列DefaultQueue String queueName = "${yourQueue}"; hashtable.put("queue.HwQueueName", queueName); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); Context context = new InitialContext(hashtable); JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL"); //同一个链接可创建多个queue,与前面queue.HwQueueName作好配对就行 Destination queue = (Destination) context.lookup("HwQueueName"); //信任服务端 TransportOptions to = new TransportOptions(); to.setTrustAll(true); cf.setSslContext(TransportSupport.createJdkSslContext(to)); // 创建连接 Connection connection = cf.createConnection(userName, password); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); // 创建 Session // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // 创建 Receiver Link MessageConsumer consumer = session.createConsumer(queue); //处理消息有两种方式 // 1,主动拉数据(推荐),参照receiveMessage(consumer) // 2, 添加监听,参照consumer.setMessageListener(messageListener), 服务端主动推数据给客户端,但得考虑接受的数据速率是客户能力能够承受住的 receiveMessage(consumer); // consumer.setMessageListener(messageListener); } private static void receiveMessage(MessageConsumer consumer) throws JMSException{ while (true){ try{ // 建议异步处理收到的消息,确保receiveMessage函数里没有耗时逻辑。 Message message = consumer.receive(); processMessage(message); } catch (Exception e) { System.out.println("receiveMessage hand an exception: " + e.getMessage()); e.printStackTrace(); } } } private static MessageListener messageListener = new MessageListener(){ @Override public void onMessage(Message message){ try { // 建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 executorService.submit(() -> processMessage(message)); } catch (Exception e){ System.out.println("submit task occurs exception: " + e.getMessage()); e.printStackTrace(); } } }; /** * 在这里处理您收到消息后的具体业务逻辑。 */ private static void processMessage(Message message) { try { String body = message.getBody(String.class); String content = new String(body); System.out.println("receive an message, the content is " + content); } catch (Exception e){ System.out.println("processMessage occurs error: " + e.getMessage()); e.printStackTrace(); } } private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener(){ /** * 连接成功建立。 */ @Override public void onConnectionEstablished(URI remoteURI){ System.out.println("onConnectionEstablished, remoteUri:" + remoteURI); } /** * 尝试过最大重试次数之后,最终连接失败。 */ @Override public void onConnectionFailure(Throwable error){ System.out.println("onConnectionFailure, " + error.getMessage()); } /** * 连接中断。 */ @Override public void onConnectionInterrupted(URI remoteURI){ System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI); } /** * 连接中断后又自动重连上。 */ @Override public void onConnectionRestored(URI remoteURI){ System.out.println("onConnectionRestored, remoteUri:" + remoteURI); } @Override public void onInboundMessage(JmsInboundMessageDispatch envelope){ System.out.println("onInboundMessage, " + envelope); } @Override public void onSessionClosed(Session session, Throwable cause){ System.out.println("onSessionClosed, session=" + session + ", cause =" + cause); } @Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause){ System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause); } @Override public void onProducerClosed(MessageProducer producer, Throwable cause){ System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause); } }; }
- 业务服务器下发控制指令
服务器通过调用消息下发接口下发控制指令,具体请参考下发设备消息。
下发设备消息关键参数说明:
名称
必选/可选
类型
位置
说明
X-Auth-Token
必选
String
Header
用户Token。通过调用IAM服务 获取IAM用户Token接口获取,接口返回的响应消息头中“X-Subject-Token”就是需要获取的用户Token。简要的获取方法样例请参见 Token认证。
Instance-Id
可选
String
Header
实例ID。物理多租下各实例的唯一标识,一般华为云租户无需携带该参数,仅在物理多租场景下从管理面访问API时需要携带该参数。
project_id
必选
String
Path
项目ID。获取方法请参见 获取项目ID。
device_id
必选
String
Path
填写迁移设备的设备ID。
message
必选
String
Body
设备执行的消息,字符串,具体格式需要应用和设备约定。
topic_full_name
可选
String(128)
Body
迁移设备的topic。
接口样例:
POST https://{Endpoint}/v5/iot/{project_id}/devices/{device_id}/messages Content-Type: application/json X-Auth-Token: ******** Instance-Id: ******** { "message": "{\"switch\":\"off\"}", "topic_full_name": "/aircondition/cmd" }
JAVA核心代码样例如下:
public class DeviceMessage { public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, IOException { String token = Authentication.getToken(); Map<String, String> headers = new HashMap<String, String>(); headers.put("Content-Type", "application/json"); headers.put("X-Auth-Token", token); Message message = new Message(); message.setMessage("{\"switch\" :\"off\"}"); message.setTopic_full_name("/aircondition/cmd"); String projectId = "11111"; String deviceId = "5fae45e358115902ce609882_20201113"; String url = "https://iotda.cn-north-4.myhuaweicloud.com/v5/iot/%s/devices/%s/messages"; url = String.format(url, projectId, deviceId); HttpUtils httpUtils = new HttpUtils(); httpUtils.initClient(); StreamClosedHttpResponse httpResponse = httpUtils.doPost(url, headers, JsonUtils.Obj2String(message)); System.out.println(httpResponse.getContent()); } }