MQTT(S)-编解码插件样例
本文以烟感设备为例,介绍如何使用JavaScript语言开发一个支持MQTT/MQTTS协议的设备进行属性上报和命令下发编解码的FunctionGraph函数插件。并介绍从二进制数据到JSON数据格式的转换及其调试方法。
烟感设备样例
场景说明
有一款烟感设备,具有如下特征:
- 具有烟雾报警功能(火灾等级)和温度上报功能。
- 支持远程控制命令,可远程打开报警功能。比如火灾现场温度,远程打开烟雾报警,提醒住户疏散。
- 该款烟感设备,设备能力比较弱,无法按照设备接口定义的JSON格式上报数据,只能上报简单的二进制数据。
产品模型定义
- level:火灾级别,用于表示火灾的严重程度。
- temperature:温度,用于表示火灾现场温度。
- SET_ALARM:打开或关闭告警命令,value=0表示关闭,value=1表示打开。响应命令result用于上报修改后告警值。
图1 模型定义-smokedetector
编解码插件开发
- 在烟感产品的详情页面,选择“插件开发”,单击“FunctionGraph开发”,单击“创建函数”,若是第一次使用,需要进行访问授权。
图2 FunctionGraph开发-插件授权
- 进入FunctionGraph控制台后,单击“创建函数”。在弹出的界面中,选择“创建空白函数”,自定义填入函数名称,运行时选择“Node.js 16.17”。
图3 函数列表-创建函数图4 创建函数-参数信息
- 编写脚本,实现二进制数据到JSON数据的转换。脚本需要实现如下两个方法:
- decode:将设备上报的二进制数据转换为平台产品模型中定义的JSON格式。具体的JSON格式要求见:数据解码格式定义 。
- encode:平台有下行数据发送给设备时,将平台的JSON格式数据转换为设备支持的二进制格式。平台的JSON格式见:数据编码格式定义。
针对当前烟感产品实现的JavaScript样例如下,将代码复制到Project工程中,并单击“部署代码”。//上行消息类型 var MSG_TYPE_PROPERTIES_REPORT = 'properties_report'; //设备属性上报 var MSG_TYPE_COMMAND_RSP = 'command_response'; //设备返回命令响应 var MSG_TYPE_PROPERTIES_SET_RSP = 'properties_set_response'; //设备返回属性设置响应 var MSG_TYPE_PROPERTIES_GET_RSP = 'properties_get_response'; //设备返回属性查询响应 var MSG_TYPE_MESSAGE_UP = 'message_up'; //设备消息上报 //下行消息类型 var MSG_TYPE_COMMANDS = 'commands'; //平台命令下发 var MSG_TYPE_PROPERTIES_SET = 'properties_set'; //平台下发属性设置请求 var MSG_TYPE_PROPERTIES_GET = 'properties_get'; //平台下发属性查询请求 var MSG_TYPE_MESSAGE_DOWN = 'messages'; //平台消息下发 //MQTT设备上行消息,topic同消息类型的映射表 var TOPIC_REG_EXP = { 'properties_report': new RegExp('\\$oc/devices/(\\S+)/sys/properties/report'), 'properties_set_response': new RegExp('\\$oc/devices/(\\S+)/sys/properties/set/response/request_id=(\\S+)'), 'properties_get_response': new RegExp('\\$oc/devices/(\\S+)/sys/properties/get/response/request_id=(\\S+)'), 'command_response': new RegExp('\\$oc/devices/(\\S+)/sys/commands/response/request_id=(\\S+)'), 'message_up': new RegExp('\\$oc/devices/(\\S+)/sys/messages/up') }; exports.handler = async (event, context) => { const codecType = event.codecType; const message = JSON.parse(event.message); console.log("input Data:", event); if (codecType === "decode") { // 解码操作 return decode(message.payload, message.topic); } else if (codecType === "encode") { // 编码操作 return encode(message); } } /* 示例:烟感设备上报属性和回复命令响应时,携带的是二进制码流,通过javascript脚本将二进制码流数据解码为符合产品模型定义的json格式数据 传入参数: // 前两个字节0x00, 0x50为属性level的值,后两个字节0x00, 0x5a为属性temperature的值。 payload:[0x00, 0x50, 0x00, 0x5a] topic:$oc/devices/cf40f3c4-7152-41c6-a201-a2333122054a/sys/properties/report 输出结果: {"msg_type":"properties_report","services":[{"service_id":"smokerdector","properties":{"level":80,"temperature":90}}]} 传入参数: // 第一个字节0x02为"command_name"是"SET_ALARM", 第二个字节0x00为命令响应成功, 后两个字节0x00, 0x01为命令响应value的值。 payload: [0x02, 0x00, 0x00, 0x01] topic: $oc/devices/cf40f3c4-7152-41c6-a201-a2333122054a/sys/commands/response/request_id=bf40f0c4-4022-41c6-a201-c5133122054a 输出结果: {"msg_type":"command_response","result_code":0,"command_name":"SET_ALARM","service_id":"smokerdector","paras":{"value":"1"}} */ // 解码函数 function decode(payload, topic) { // 在这里实现具体的解码逻辑 var binaryString = atob(payload); const byteArray = new Uint8Array(binaryString.length); for (let i = 0; i < binaryString.length; i++) { byteArray[i] = binaryString.charCodeAt(i); } /* byteArray 为解码后,设备上报的二进制数据,客户可以在此处查看上报的数据是否解析正确 */ var returnData; msgType = topicParse(topic); if (msgType == MSG_TYPE_PROPERTIES_REPORT) { returnData = decodePropertiesReport(byteArray); } else if (msgType == MSG_TYPE_COMMAND_RSP) { returnData = decodeCommandRsp(byteArray); } else if (msgType == MSG_TYPE_PROPERTIES_SET_RSP) { // 转换为属性设置响应的JSON格式 // jsonObj = {"msg_type":"properties_set_response","result_code":0,"result_desc":"success"}; // returnData = outputData(status, jsonObj) } else if (msgType == MSG_TYPE_PROPERTIES_GET_RSP) { // 转换为属性查询响应的JSON格式 // jsonObj = {"msg_type":"properties_get_response","services":[{"service_id":"analog","properties":{"PhV_phsA":"1","PhV_phsB":"2"}}]}; // returnData = outputData(status, jsonObj) } else if (msgType == MSG_TYPE_MESSAGE_UP) { // 转换为消息上报的JSON格式 // jsonObj = {"msg_type":"message_up","content":"hello"}; // returnData = outputData(status, jsonObj) } return returnData; } // 编码函数 /* 示例数据:命令下发时,通过javascript的encode方法将平台JSON格式的数据,编码为二进制码流 传入参数 -> {"msg_type":"commands","command_name":"SET_ALARM","service_id":"smokerdector","paras":{"value":1}} 输出结果 -> // 第一个字节0x01用于标识命令下发,第二个字节0x00为command_name == 'SET_ALARM', 后两个字节0x00, 0x01为设置的命令属性值的值。 [0x01, 0x00, 0x00, 0x01] */ function encode(data) { var msgType = data.msg_type; let payload = []; var status = 200; // 命令下发 if (msgType == MSG_TYPE_COMMANDS) { payload[0] = 0x02; // 标识命令下发 if (data.command_name == 'SET_ALARM') { payload[1] = 0x00; // 标识命令名称 } // 设置命令属性值 payload[2] = (data.paras.value >> 8) & 0xFF; payload[3] = data.paras.value & 0xFF; } else if (msgType == MSG_TYPE_PROPERTIES_SET) { // 设备属性上报的响应消息 // 属性设置格式样例:{"msg_type":"properties_set","services":[{"service_id":"Temperature","properties":{"value":57}}]} // 开发者有对应属性设置场景时需要根据该JSON格式转换为对应的二进制码流 } else if (msgType == MSG_TYPE_PROPERTIES_GET) { // 属性查询格式样例:{"msg_type":"properties_get","service_id":"Temperature"} // 开发者有对应属性查询场景时需要根据该JSON格式转换为对应的二进制码流 } else if (msgType == MSG_TYPE_MESSAGE_DOWN) { // 消息下发格式样例:{"msg_type":"messages","content":"hello"} // 开发者对应消息下发场景时需要根据该JSON格式转换为对应的二进制码流 } return outputData(status, { "payload": payload }); } // 根据topic名称解析出消息类型 function topicParse(topic) { for (var type in TOPIC_REG_EXP) { var pattern = TOPIC_REG_EXP[type]; if (pattern.test(topic)) { return type; } } return ''; } // 属性上报 -- 上行 function decodePropertiesReport(byteArray) { // 设置serviceId参数值,该参数值对应产品模型中的服务类型smokerdector var serviceId = 'smokerdector'; var level = byteArray[0] * Math.pow(2, 8) + byteArray[1]; var status = 200; var jsonObj; if (byteArray.length < 4) { jsonObj = { "msg_type": "ERR", "message": "decodePropertiesReport byte length < 5." }; status = 402; } // 获取第4位和第5位的值 const integerPart = byteArray[2]; // 第3位 const decimalPart = byteArray[3]; // 第4位 // 组合成小数 const temperature = parseFloat(integerPart + '.' + decimalPart); jsonObj = { "msg_type": MSG_TYPE_PROPERTIES_REPORT, "services": [{ "service_id": serviceId, "properties": { "level": level, "temperature": temperature } }] }; return outputData(status, jsonObj); } // 命令响应 -- 上行 function decodeCommandRsp(byteArray) { var serviceId = 'smokerdector'; var command = byteArray[0]; var command_name = ''; if (2 == command) { command_name = 'SET_ALARM'; } var result_code = byteArray[1]; // 从二进制码流中获取命令执行结果 var value = byteArray[2] * Math.pow(2, 8) + byteArray[3]; // 从二进制码流中获取命令执行结果返回值 //转换为命令响应的JSON格式 jsonObj = { 'msg_type': MSG_TYPE_COMMAND_RSP, 'service_id': serviceId, "command_name": command_name, 'result_code': result_code, 'paras': { 'value': value } }; return outputData(200, jsonObj); } // 输出结果 function outputData(status, body) { const output = { 'status': status, 'message': JSON.stringify(body), } console.log("output Data:", output); return output; }
图5 FunctionGraph-复制代码到Project工程 - 在线调试脚本。脚本编辑完成后,在FunctionGraph函数中单击“配置测试事件”,选择空白模板,输入模拟数据,单击“创建”。配置好测试事件后,单击“测试”,即可得到函数返回结果及其日志。
假设输入以下模拟数据:其中payload为设备侧上报的二进制:0x01, 0x02, 0x03, 0x04。"AQIDBA=="为被平台Base64编码后的值。
{ "codecType": "decode", "message": "{\"topic\": \"$oc/devices/device_id/sys/properties/report\",\"payload\": \"AQIDBA==\"}" }
图6 FunctionGraph-添加测试事件图7 FunctionGraph-测试结果MQTT - 调试成功后,在1的“目标函数”中选择您创建的FunctionGraph函数,并单击“部署”。
图8 FunctionGraph开发-部署插件