更新时间:2024-08-29 GMT+08:00
分享

Topic通信场景迁移实践

本文以自建MQTT Broker的空调控制设备迁移到华为云IoT为例,介绍如何进行Topic通信场景的设备迁移。

业务场景说明

自建MQTT Broker基于MQTT协议的上行数据和下行指令的业务定义如下:

表1 业务场景

业务场景

通信Topic

报文Payload

设备上报数据

/aircondition/data/up

{ "temperature": 26.0 }

服务端控制指令

/aircondition/cmd

{ "switch": "off" }

  • 设备使用一机一密的认证方式。
  • 设备通过Topic上报数据,通过消息流转功能将数据转发到后端服务。
  • 服务端通过消息下发接口下发消息给设备。

迁移方案说明

图1 迁移方案

Topic通信场景的迁移方案分为下面三个步骤:

  1. 在控制台配置自定义鉴权、Topic策略和数据流转方案,请参见1.平台端配置开发

  2. 设备端调整接入到平台的域名,请参见2.设备端域名切换

  3. 对应用端进行业务开发,实现接收设备数据和下发控制指令,请参见3.应用端SDK适配

1.平台端配置开发

  1. 配置自定义鉴权。

    1. 访问函数服务(FunctionGraph)创建自定义鉴权函数,鉴权函数代码参考如下:
      exports.handler = async (event, context) => {
          console.log("username=" + event.username);
          // 此处编写您的校验逻辑
          
          // 返回的JSON格式(格式固定不变)
          const authRes = {
              "result_code": 200,
              "result_desc": "sucessful",
              "refresh_seconds": 300,
              "device": {
                  "device_id": "myDeviceId",
                  "provision_enable": true,
                  "provisioning_resource": {
                      "device_name": "myDeviceName",
                      "node_id": "myNodeId",
                      "product_id": "myProductId",
                      "app_id": "customization0000000000000000000"
                  }
              }
          }
          return JSON.stringify(authRes);
      }
    2. 访问设备接入服务,单击“管理控制台”进入设备接入控制台。选择您的实例,单击实例卡片进入。在左侧导航栏“设备”中单击“自定义鉴权”,可到自定义鉴权配置界面配置。
      图2 自定义鉴权-创建鉴权

      自定义鉴权配置参数如下:
      表2 自定义鉴权参数说明

      参数名

      配置说明

      鉴权名称

      填写“Test_Auth_1”。

      鉴权函数

      自定义鉴权器对应的函数名称,从FunctionGraph已经创建的函数列表中选取。

      状态

      选择“激活”。

      签名认证

      选择“否”。

      token值

      未开启签名认证,这里不用填写。

      公钥

      未开启签名认证,这里不用填写。

      是否默认自定义鉴权器

      选择“是”。

      是否缓存

      选择“是”。

  1. 配置Topic策略。

    1. 参考设备策略使用说明,配置Topic策略,Topic策略配置参考如下:
      表3 Topic策略参数说明

      参数名

      配置说明

      所属资源空间

      下拉选择所属的资源空间。

      策略名称

      填写为“PolicyTest”。

      资源

      填写为“topic:/aircondition/data/up”。

      操作

      选择“发布”。

      权限

      选择“允许”。

      资源

      填写为“topic:/aircondition/cmd”。

      操作

      选择“订阅”。

      权限

      选择“允许”。

    2. 1中配置的Topic策略绑定到指定的产品。
      表4 绑定产品参数说明

      参数名

      配置说明

      设备目标类型

      选择“产品”

      策略目标

      在“策略目标”的参数中勾选对应产品。

  1. 配置流转规则。

    1. 参考配置AMQP服务端订阅,配置AMQP订阅。配置参数如下:
      表5 AMQP订阅参数说明

      参数值

      配置说明

      规则名称

      填写为“MyRule”。

      规则描述

      填写为“RuleTest”。

      数据来源

      选择“设备消息”。

      触发事件

      选择“设备消息上报”。

      资源空间

      选择对应资源空间。

    2. 添加AMQP推送消息队列的转发目标,配置参数如下:
      表6 转发目标参数说明

      参数值

      配置说明

      转发目标

      选择“AMQP推送消息队列”。

      消息队列

      选择“DefaultQueue”。

2.设备端域名切换

完成平台端配置后,需要进行设备端业务开发。完整的设备开发流程可参考设备侧开发。本章节以Paho-MQTT开源MQTT客户端为例,介绍在设备迁移场景下,设备侧如何在只修改接入地址的情况下,实现设备建立MQTT连接、Topc订阅、消息发布等功能。
//接入华为云IoT的域名,在控制台的"总览"界面的"平台接入地址"中获取“设备侧”的MQTTS接入地址。
String server = "ssl://******.st1.iotda-device.cn-north-4.myhuaweicloud.com:8883";  
// Paho MQTT客户端。
MqttClient myMqttClient = new MqttClient(server, "myClientId", persistence);  
// Paho MQTT连接参数。
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setCleanSession(true); 
connOpts.setKeepAliveInterval(120); 
connOpts.setUserName("myUserName"); 
connOpts.setPassword("myPassword".toCharArray()); 
myMqttClient.connect(connOpts); 
System.out.println("Broker: " + broker + " Connected");

// Paho MQTT消息订阅。
myMqttClient.subscribe("/aircondition/cmd", new MyMessageListener());

// Paho MQTT发布消息。
String topic = "/aircondition/data/up"; 
String content = "{\"temperature\": 26.0 }"; 
MqttMessage message = new MqttMessage(content.getBytes()); 
message.setQos(0); 
myMqttClient.publish(topic, message);

3.应用端SDK适配

华为云IoTDA提供应用端SDK,开发者只需做少量代码修改就可完成应用端适配,实现应用端的快速迁移。下面以应用端接收设备上报的消息和应用端下发消息给设备为例,介绍如何进行应用端适配。

  1. 参考AMQP客户端接入说明Java SDK接入示例进行设备上报消息的适配处理。核心代码参考:

    try {
         MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
         consumer.setMessageListener(message -> {
             try {
                 // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
                 processMessage(message.getBody(String.class));
                 // 如果options.isAutoAcknowledge==false,此处应该调用message.acknowledge();
             } catch (Exception e) {
                 log.warn("message.getBody error,exception is ", e);
             }
         });
     } catch (Exception e) {
         log.warn("Consumer initialize error,", e);
     }

  1. 参考消息下发使用示例 ,下发设备消息。服务端向单个设备下发消息样例如下:

    public class MessageDistributionSolution {
            ..................
            DeviceMessageRequest body = new DeviceMessageRequest();
            body.withPayloadFormat("raw");
            body.withTopicFullName("/aircondition/cmd");
            body.withMessage({"switch":"off"});
            request.withBody(body);
            try {
                CreateMessageResponse response = client.createMessage(request);
                System.out.println(response.toString());
            } catch (ConnectionException e) {
                e.printStackTrace();
            } catch (RequestTimeoutException e) {
                e.printStackTrace();
            } catch (ServiceResponseException e) {
                e.printStackTrace();
                System.out.println(e.getHttpStatusCode());
                System.out.println(e.getRequestId());
                System.out.println(e.getErrorCode());
                System.out.println(e.getErrorMsg());
            }
        }
    }

相关文档