Topic通信场景迁移实践
本文以自建MQTT Broker的空调控制设备迁移到华为云IoT为例,介绍如何进行Topic通信场景的设备迁移。
业务场景说明
自建MQTT Broker基于MQTT协议的上行数据和下行指令的业务定义如下:
业务场景 |
通信Topic |
报文Payload |
---|---|---|
设备上报数据 |
/aircondition/data/up |
{ "temperature": 26.0 } |
服务端控制指令 |
/aircondition/cmd |
{ "switch": "off" } |
- 设备使用一机一密的认证方式。
- 设备通过Topic上报数据,通过消息流转功能将数据转发到后端服务。
- 服务端通过消息下发接口下发消息给设备。
迁移方案说明
Topic通信场景的迁移方案分为下面三个步骤:
-
在控制台配置自定义鉴权、Topic策略和数据流转方案,请参见1.平台端配置开发。
-
设备端调整接入到平台的域名,请参见2.设备端域名切换。
-
对应用端进行业务开发,实现接收设备数据和下发控制指令,请参见3.应用端SDK适配。
1.平台端配置开发
- 配置自定义鉴权。
- 访问函数服务(FunctionGraph)创建自定义鉴权函数,鉴权函数代码参考如下:
exports.handler = async (event, context) => { console.log("username=" + event.username); // 此处编写您的校验逻辑 // 返回的JSON格式(格式固定不变) const authRes = { "result_code": 200, "result_desc": "sucessful", "refresh_seconds": 300, "device": { "device_id": "myDeviceId", "provision_enable": true, "provisioning_resource": { "device_name": "myDeviceName", "node_id": "myNodeId", "product_id": "myProductId", "app_id": "customization0000000000000000000" } } } return JSON.stringify(authRes); }
- 访问设备接入服务,单击“管理控制台”进入设备接入控制台。选择您的实例,单击实例卡片进入。在左侧导航栏“设备”中单击“自定义鉴权”,可到自定义鉴权配置界面配置。
图2 自定义鉴权-创建鉴权自定义鉴权配置参数如下:
表2 自定义鉴权参数说明 参数名
配置说明
鉴权名称
填写“Test_Auth_1”。
鉴权函数
自定义鉴权器对应的函数名称,从FunctionGraph已经创建的函数列表中选取。
状态
选择“激活”。
签名认证
选择“否”。
token值
未开启签名认证,这里不用填写。
公钥
未开启签名认证,这里不用填写。
是否默认自定义鉴权器
选择“是”。
是否缓存
选择“是”。
- 访问函数服务(FunctionGraph)创建自定义鉴权函数,鉴权函数代码参考如下:
- 配置Topic策略。
- 配置流转规则。
- 参考配置AMQP服务端订阅,配置AMQP订阅。配置参数如下:
表5 AMQP订阅参数说明 参数值
配置说明
规则名称
填写为“MyRule”。
规则描述
填写为“RuleTest”。
数据来源
选择“设备消息”。
触发事件
选择“设备消息上报”。
资源空间
选择对应资源空间。
- 添加AMQP推送消息队列的转发目标,配置参数如下:
表6 转发目标参数说明 参数值
配置说明
转发目标
选择“AMQP推送消息队列”。
消息队列
选择“DefaultQueue”。
- 参考配置AMQP服务端订阅,配置AMQP订阅。配置参数如下:
2.设备端域名切换
//接入华为云IoT的域名,在控制台的"总览"界面的"平台接入地址"中获取“设备侧”的MQTTS接入地址。 String server = "ssl://******.st1.iotda-device.cn-north-4.myhuaweicloud.com:8883"; // Paho MQTT客户端。 MqttClient myMqttClient = new MqttClient(server, "myClientId", persistence); // Paho MQTT连接参数。 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setKeepAliveInterval(120); connOpts.setUserName("myUserName"); connOpts.setPassword("myPassword".toCharArray()); myMqttClient.connect(connOpts); System.out.println("Broker: " + broker + " Connected"); // Paho MQTT消息订阅。 myMqttClient.subscribe("/aircondition/cmd", new MyMessageListener()); // Paho MQTT发布消息。 String topic = "/aircondition/data/up"; String content = "{\"temperature\": 26.0 }"; MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(0); myMqttClient.publish(topic, message);
3.应用端SDK适配
华为云IoTDA提供应用端SDK,开发者只需做少量代码修改就可完成应用端适配,实现应用端的快速迁移。下面以应用端接收设备上报的消息和应用端下发消息给设备为例,介绍如何进行应用端适配。
- 参考AMQP客户端接入说明,Java SDK接入示例进行设备上报消息的适配处理。核心代码参考:
try { MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE); consumer.setMessageListener(message -> { try { // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。 processMessage(message.getBody(String.class)); // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge(); } catch (Exception e) { log.warn("message.getBody error,exception is ", e); } }); } catch (Exception e) { log.warn("Consumer initialize error,", e); }
- 参考消息下发使用示例 ,下发设备消息。服务端向单个设备下发消息样例如下:
public class MessageDistributionSolution { .................. DeviceMessageRequest body = new DeviceMessageRequest(); body.withPayloadFormat("raw"); body.withTopicFullName("/aircondition/cmd"); body.withMessage({"switch":"off"}); request.withBody(body); try { CreateMessageResponse response = client.createMessage(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }