更新时间:2025-06-26 GMT+08:00
分享

MQTT(S)-编解码插件样例

本文以烟感设备为例,介绍如何使用JavaScript语言开发一个支持MQTT/MQTTS协议的设备进行属性上报和命令下发编解码的FunctionGraph函数插件。并介绍从二进制数据到JSON数据格式的转换及其调试方法。

烟感设备样例

场景说明

有一款烟感设备,具有如下特征:

  • 具有烟雾报警功能(火灾等级)和温度上报功能。
  • 支持远程控制命令,可远程打开报警功能。比如火灾现场温度,远程打开烟雾报警,提醒住户疏散。
  • 该款烟感设备,设备能力比较弱,无法按照设备接口定义的JSON格式上报数据,只能上报简单的二进制数据。

产品模型定义

在烟感产品的开发空间,完成产品模型定义。
  • level:火灾级别,用于表示火灾的严重程度。
  • temperature:温度,用于表示火灾现场温度。
  • SET_ALARM:打开或关闭告警命令,value=0表示关闭,value=1表示打开。响应命令result用于上报修改后告警值。
    图1 模型定义-smokedetector

编解码插件开发

  1. 在烟感产品的详情页面,选择“插件开发”,单击“FunctionGraph开发”,单击“创建函数”,若是第一次使用,需要进行访问授权。

    图2 FunctionGraph开发-插件授权

  2. 进入FunctionGraph控制台后,单击“创建函数”。在弹出的界面中,选择“创建空白函数”,自定义填入函数名称,运行时选择“Node.js 16.17”。

    图3 函数列表-创建函数
    图4 创建函数-参数信息

  3. 编写脚本,实现二进制数据到JSON数据的转换。脚本需要实现如下两个方法:

    • decode:将设备上报的二进制数据转换为平台产品模型中定义的JSON格式。具体的JSON格式要求见:数据解码格式定义
    • encode:平台有下行数据发送给设备时,将平台的JSON格式数据转换为设备支持的二进制格式。平台的JSON格式见:数据编码格式定义
    针对当前烟感产品实现的JavaScript样例如下,将代码复制到Project工程中,并单击“部署代码”。
    //上行消息类型
    var MSG_TYPE_PROPERTIES_REPORT = 'properties_report'; //设备属性上报
    var MSG_TYPE_COMMAND_RSP = 'command_response'; //设备返回命令响应
    var MSG_TYPE_PROPERTIES_SET_RSP = 'properties_set_response'; //设备返回属性设置响应
    var MSG_TYPE_PROPERTIES_GET_RSP = 'properties_get_response'; //设备返回属性查询响应
    var MSG_TYPE_MESSAGE_UP = 'message_up'; //设备消息上报
    //下行消息类型
    var MSG_TYPE_COMMANDS = 'commands'; //平台命令下发
    var MSG_TYPE_PROPERTIES_SET = 'properties_set'; //平台下发属性设置请求
    var MSG_TYPE_PROPERTIES_GET = 'properties_get'; //平台下发属性查询请求
    var MSG_TYPE_MESSAGE_DOWN = 'messages'; //平台消息下发
    
    //MQTT设备上行消息,topic同消息类型的映射表
    var TOPIC_REG_EXP = {
        'properties_report': new RegExp('\\$oc/devices/(\\S+)/sys/properties/report'),
        'properties_set_response': new RegExp('\\$oc/devices/(\\S+)/sys/properties/set/response/request_id=(\\S+)'),
        'properties_get_response': new RegExp('\\$oc/devices/(\\S+)/sys/properties/get/response/request_id=(\\S+)'),
        'command_response': new RegExp('\\$oc/devices/(\\S+)/sys/commands/response/request_id=(\\S+)'),
        'message_up': new RegExp('\\$oc/devices/(\\S+)/sys/messages/up')
    };
    exports.handler = async (event, context) => {
        const codecType = event.codecType;
        const message = JSON.parse(event.message);
        console.log("input Data:", event);
        if (codecType === "decode") {
            // 解码操作
            return decode(message.payload, message.topic);
        } else if (codecType === "encode") {
            // 编码操作
            return encode(message);
        }
    }
    /*
    示例:烟感设备上报属性和回复命令响应时,携带的是二进制码流,通过javascript脚本将二进制码流数据解码为符合产品模型定义的json格式数据
    传入参数:
      // 前两个字节0x00, 0x50为属性level的值,后两个字节0x00, 0x5a为属性temperature的值。
      payload:[0x00, 0x50, 0x00, 0x5a] 
      topic:$oc/devices/cf40f3c4-7152-41c6-a201-a2333122054a/sys/properties/report
    输出结果:
      {"msg_type":"properties_report","services":[{"service_id":"smokerdector","properties":{"level":80,"temperature":90}}]}
    传入参数:
      // 第一个字节0x02为"command_name"是"SET_ALARM", 第二个字节0x00为命令响应成功, 后两个字节0x00, 0x01为命令响应value的值。
      payload: [0x02, 0x00, 0x00, 0x01] 
      topic: $oc/devices/cf40f3c4-7152-41c6-a201-a2333122054a/sys/commands/response/request_id=bf40f0c4-4022-41c6-a201-c5133122054a
    输出结果:
      {"msg_type":"command_response","result_code":0,"command_name":"SET_ALARM","service_id":"smokerdector","paras":{"value":"1"}}
    */
    // 解码函数
    function decode(payload, topic) {
        // 在这里实现具体的解码逻辑
        var binaryString = atob(payload);
        const byteArray = new Uint8Array(binaryString.length);
        for (let i = 0; i < binaryString.length; i++) {
            byteArray[i] = binaryString.charCodeAt(i);
        }
        /*  byteArray 为解码后,设备上报的二进制数据,客户可以在此处查看上报的数据是否解析正确 */
        var returnData;
        msgType = topicParse(topic);
            if (msgType == MSG_TYPE_PROPERTIES_REPORT) {
            returnData = decodePropertiesReport(byteArray);
        } else if (msgType == MSG_TYPE_COMMAND_RSP) {
            returnData = decodeCommandRsp(byteArray);
        } else if (msgType == MSG_TYPE_PROPERTIES_SET_RSP) {
            // 转换为属性设置响应的JSON格式
            // jsonObj = {"msg_type":"properties_set_response","result_code":0,"result_desc":"success"};
            // returnData = outputData(status, jsonObj)
        } else if (msgType == MSG_TYPE_PROPERTIES_GET_RSP) {
            // 转换为属性查询响应的JSON格式
            // jsonObj = {"msg_type":"properties_get_response","services":[{"service_id":"analog","properties":{"PhV_phsA":"1","PhV_phsB":"2"}}]};
            // returnData = outputData(status, jsonObj)
        } else if (msgType == MSG_TYPE_MESSAGE_UP) {
            // 转换为消息上报的JSON格式
            // jsonObj = {"msg_type":"message_up","content":"hello"};
            // returnData = outputData(status, jsonObj)
        }
        return returnData;
    }
    // 编码函数
    /*
    示例数据:命令下发时,通过javascript的encode方法将平台JSON格式的数据,编码为二进制码流
    传入参数 ->
        {"msg_type":"commands","command_name":"SET_ALARM","service_id":"smokerdector","paras":{"value":1}}
    输出结果 ->
        // 第一个字节0x01用于标识命令下发,第二个字节0x00为command_name == 'SET_ALARM', 后两个字节0x00, 0x01为设置的命令属性值的值。
        [0x01, 0x00, 0x00, 0x01] 
    */
    function encode(data) {
        var msgType = data.msg_type;
        let payload = [];
        var status = 200;
        // 命令下发
        if (msgType == MSG_TYPE_COMMANDS) {
            payload[0] = 0x02; // 标识命令下发
            if (data.command_name == 'SET_ALARM') {
                payload[1] = 0x00; // 标识命令名称
            }
            // 设置命令属性值
            payload[2] = (data.paras.value >> 8) & 0xFF;
            payload[3] = data.paras.value & 0xFF;
        } else if (msgType == MSG_TYPE_PROPERTIES_SET) {
            // 设备属性上报的响应消息
            // 属性设置格式样例:{"msg_type":"properties_set","services":[{"service_id":"Temperature","properties":{"value":57}}]}
            // 开发者有对应属性设置场景时需要根据该JSON格式转换为对应的二进制码流
        } else if (msgType == MSG_TYPE_PROPERTIES_GET) {
            // 属性查询格式样例:{"msg_type":"properties_get","service_id":"Temperature"}
            // 开发者有对应属性查询场景时需要根据该JSON格式转换为对应的二进制码流
        } else if (msgType == MSG_TYPE_MESSAGE_DOWN) {
            // 消息下发格式样例:{"msg_type":"messages","content":"hello"}
            // 开发者对应消息下发场景时需要根据该JSON格式转换为对应的二进制码流
        }
        return outputData(status, { "payload": payload });
    }
    // 根据topic名称解析出消息类型
    function topicParse(topic) {
        for (var type in TOPIC_REG_EXP) {
            var pattern = TOPIC_REG_EXP[type];
            if (pattern.test(topic)) {
                return type;
            }
        }
        return '';
    }
    // 属性上报 -- 上行
    function decodePropertiesReport(byteArray) {
        // 设置serviceId参数值,该参数值对应产品模型中的服务类型smokerdector
        var serviceId = 'smokerdector';
        var level = byteArray[0] * Math.pow(2, 8) + byteArray[1];
        var status = 200;
        var jsonObj;
        if (byteArray.length < 4) {
            jsonObj = {
                "msg_type": "ERR", "message": "decodePropertiesReport byte length < 5."
            };
            status = 402;
        }
        // 获取第4位和第5位的值
        const integerPart = byteArray[2]; // 第3位
        const decimalPart = byteArray[3]; // 第4位
        // 组合成小数
        const temperature = parseFloat(integerPart + '.' + decimalPart);
        jsonObj = {
            "msg_type": MSG_TYPE_PROPERTIES_REPORT, "services":
                [{ "service_id": serviceId, "properties": { "level": level, "temperature": temperature } }]
        };
        return outputData(status, jsonObj);
    }
    // 命令响应 -- 上行
    function decodeCommandRsp(byteArray) {
        var serviceId = 'smokerdector';
        var command = byteArray[0];
        var command_name = '';
        if (2 == command) {
            command_name = 'SET_ALARM';
        }
        var result_code = byteArray[1]; // 从二进制码流中获取命令执行结果
        var value = byteArray[2] * Math.pow(2, 8) + byteArray[3]; // 从二进制码流中获取命令执行结果返回值
        //转换为命令响应的JSON格式
        jsonObj = {
            'msg_type': MSG_TYPE_COMMAND_RSP, 'service_id': serviceId, "command_name": command_name,
            'result_code': result_code, 'paras': { 'value': value }
        };
        return outputData(200, jsonObj);
    }
    // 输出结果
    function outputData(status, body) {
        const output =
        {
            'status': status,
            'message': JSON.stringify(body),
        }
        console.log("output Data:", output);
        return output;
    } 
    图5 FunctionGraph-复制代码到Project工程

  4. 在线调试脚本。脚本编辑完成后,在FunctionGraph函数中单击“配置测试事件”,选择空白模板,输入模拟数据,单击“创建”。配置好测试事件后,单击“测试”,即可得到函数返回结果及其日志。

    假设输入以下模拟数据:其中payload为设备侧上报的二进制:0x01, 0x02, 0x03, 0x04。"AQIDBA=="为被平台Base64编码后的值。
    {
        "codecType": "decode",
        "message": "{\"topic\": \"$oc/devices/device_id/sys/properties/report\",\"payload\": \"AQIDBA==\"}"
    }
    图6 FunctionGraph-添加测试事件
    图7 FunctionGraph-测试结果MQTT

  5. 调试成功后,在1的“目标函数”中选择您创建的FunctionGraph函数,并单击“部署”。

    图8 FunctionGraph开发-部署插件

相关文档