IoT Device SDK使用指南(Java)
准备工作
- 开发环境要求:已经安装JDK(版本1.8以上)和maven
- 访问SDK下载页面,下载SDK,整个工程包含以下子工程:
iot-device-sdk-java:sdk代码
iot-device-demo:普通直连设备的demo代码
iot-gateway-demo:网关设备的demo代码
iot-bridge-sdk:网桥的sdk代码
iot-bridge-demo:网桥的demo代码,用来演示如何将tcp设备桥接到平台
iot-bridge-sample-tcp-protocol:子设备使用tcp协议链接网桥的样例
iot-device-code-generator:设备代码生成器,可以根据产品模型自动生成设备代码
- 编译安装:进入到SDK根目录,执行mvn install
创建产品
为了方便体验,我们提供了一个烟感的产品模型,烟感会上报烟雾值、温度、湿度、烟雾报警、还支持响铃报警命令。以烟感例,体验消息上报、属性上报等功能。
- 访问设备接入服务,单击“管理控制台”进入设备接入控制台,选择您的实例,单击实例卡片进入。查看MQTTS设备接入域名,保存该地址。
- 单击左侧导航栏“产品”,单击页面左侧的“创建产品”。
- 根据页面提示填写参数,然后单击“确定”完成产品的创建。
基本信息
所属资源空间
平台自动将新创建的产品归属在默认资源空间下。如需归属在其他资源空间下,下拉选择所属的资源空间。如无对应的资源空间,请先创建资源空间。
产品名称
自定义。支持字母、数字、下划线(_)、连字符(-)的字符组合。
协议类型
选择“MQTT”。
数据格式
选择“JSON”。
设备类型选择
选择”自定义类型”
设备类型
填写“smokeDetector”
高级配置
产品ID
不填写
产品描述
请根据实际情况填写。
上传产品模型
- 单击下载烟感产品模型smokeDetector,获取产品模型文件。
- 找到步骤3创建的产品,单击产品进入产品详情页。
- 选择“基本信息”页签,单击“上传模型文件”,上传步骤1获取的产品模型文件。
图1 产品-上传产品模型
设备初始化
- 创建设备时,需要写入在注册设备时获取的设备ID、密码,以及1中获取的设备对接信息,注意格式为ssl://域名信息:端口号 或 ssl://IP地址:端口号
1 2 3 4 5 6
// 获取证书路径:加载iot平台的ca证书,进行服务端校验,使用sdk默认的ca.jks即可。 URL resource = MessageSample.class.getClassLoader().getResource("ca.jks"); File file = new File(resource.getPath()); //例如在iot-device-demo文件 MessageSample.java中修改以下参数 IoTDevice device = new IoTDevice("ssl://域名信息:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret", file);
所有涉及设备ID和密码的文件均需要修改成对应的信息。
- 建立连接。调用init接口,该接口是阻塞调用,如果建立连接成功会返回0。
1 2 3
if (device.init() != 0) { return; }
如果连接成功就会打印:1
2023-07-17 17:22:59 INFO MqttConnection:105 - Mqtt client connected. address :ssl://域名信息:8883
- 创建设备并连接成功后,可以开始使用设备进行通信。调用IoT Device 的getClient接口获取设备客户端,客户端提供了消息、属性、命令等通讯接口。
消息上报
消息上报是指设备向平台上报消息。
- 从device中获取客户端,调用IoTDevice的getClient接口即可获取到客户端。
- 调用客户端的reportDeviceMessage接口来上报设备消息。在MessageSample这个例子中我们周期性上报消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
while (true) { device.getClient().reportDeviceMessage(new DeviceMessage("hello"), new ActionListener() { @Override public void onSuccess(Object context) { log.info("reportDeviceMessage ok"); } @Override public void onFailure(Object context, Throwable var2) { log.error("reportDeviceMessage fail: " + var2); } }); //上报自定义topic消息,注意需要先在平台配置自定义topic String topic = "$oc/devices/" + device.getDeviceId() + "/user/wpy"; device.getClient().publishRawMessage(new RawMessage(topic, "hello raw message "), new ActionListener() { @Override public void onSuccess(Object context) { log.info("publishRawMessage ok: "); } @Override public void onFailure(Object context, Throwable var2) { log.error("publishRawMessage fail: " + var2); } }); Thread.sleep(5000); }
- 修改MessageSample类的main函数,替换自己的设备参数后运行MessageSample类,查看日志打印看到连接成功和发送消息的打印:
1 2 3 4 5 6 7 8 9 10 11
2024-04-16 16:43:09 INFO AbstractService:103 - create device, the deviceId is 5e06bfee334dd4f33759f5b3_demo 2024-04-16 16:43:09 INFO MqttConnection:233 - try to connect to ssl://域名信息:8883 2024-04-16 16:43:10 INFO MqttConnection:257 - connect success, the uri is ssl://域名信息:8883 2024-04-16 16:43:11 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect success","timestamp":"1713256990817"},"service_id":"$log","event_type":"log_report","event_time":"20240416T084310Z","event_id":null}]} 2024-04-16 16:43:11 INFO MqttConnection:140 - Mqtt client connected. address is ssl://域名信息:8883 2024-04-16 16:43:11 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"device_sdk_version":"JAVA_v1.2.0","fw_version":null,"sw_version":null},"service_id":"$sdk_info","event_type":"sdk_info_report","event_time":"20240416T084311Z","event_id":null}]} 2024-04-16 16:43:11 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect complete, the url is ssl://域名信息:8883","timestamp":"1713256991263"},"service_id":"$log","event_type":"log_report","event_time":"20240416T084311Z","event_id":null}]} 2024-04-16 16:43:11 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":null} 2024-04-16 16:43:11 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/user/wpy, msg = hello raw message 2024-04-16 16:43:11 INFO MessageSample:98 - reportDeviceMessage ok 2024-04-16 16:43:11 INFO MessageSample:113 - publishRawMessage ok:
- 在设备接入控制台,选择图2 设备列表-设备在线
-查看设备是否在线。
- 选择对应设备,单击“详情”,在设备详情页面启动设备消息跟踪。
图3 消息跟踪-启动消息跟踪
- 平台收到了设备的消息。
图4 消息跟踪-查看device_sdk_java消息跟踪
注:消息跟踪会有一定的延时,如果没有看到数据,请等待后刷新。
属性上报
打开PropertySample类,这个例子中会定时的上报alarm、temperature、humidity、smokeConcentration这四个属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
//定时上报属性 while (true) { Map<String ,Object> json = new HashMap<>(); Random rand = new Random(); //按照物模型设置属性 json.put("alarm", 1); json.put("temperature", rand.nextFloat()*100.0f); json.put("humidity", rand.nextFloat()*100.0f); json.put("smokeConcentration", rand.nextFloat() * 100.0f); ServiceProperty serviceProperty = new ServiceProperty(); serviceProperty.setProperties(json); serviceProperty.setServiceId("smokeDetector");//serviceId要和物模型一致 device.getClient().reportProperties(Arrays.asList(serviceProperty), new ActionListener() { @Override public void onSuccess(Object context) { log.info("pubMessage success" ); } @Override public void onFailure(Object context, Throwable var2) { log.error("reportProperties failed" + var2.toString()); } }); Thread.sleep(10000); } } |
修改PropertySample的main函数后直接运行PropertySample类,查看日志看到发送成功的打印
2024-04-17 15:38:37 INFO AbstractService:103 - create device, the deviceId is 5e06bfee334dd4f33759f5b3_demo 2024-04-17 15:38:37 INFO MqttConnection:233 - try to connect to ssl://域名信息:8883 2024-04-17 15:38:38 INFO MqttConnection:257 - connect success, the uri is ssl://域名信息:8883 2024-04-17 15:38:38 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"661e35467bdccc0126d1a595_feng-sdk-test3","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect success","timestamp":"1713339518043"},"service_id":"$log","event_type":"log_report","event_time":"20240417T073838Z","event_id":null}]} 2024-04-17 15:38:38 INFO MqttConnection:140 - Mqtt client connected. address is ssl://域名信息:8883 2024-04-17 15:38:38 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"661e35467bdccc0126d1a595_feng-sdk-test3","services":[{"paras":{"device_sdk_version":"JAVA_v1.2.0","fw_version":null,"sw_version":null},"service_id":"$sdk_info","event_type":"sdk_info_report","event_time":"20240417T073838Z","event_id":null}]} 2024-04-17 15:38:38 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect complete, the url is ssl://域名信息:8883","timestamp":"1713339518464"},"service_id":"$log","event_type":"log_report","event_time":"20240417T073838Z","event_id":null}]} 2024-04-17 15:38:38 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/properties/report, msg = {"services":[{"properties":{"alarm":1,"temperature":55.435158,"humidity":51.950867,"smokeConcentration":43.89913},"service_id":"smokeDetector","event_time":null}]} 2024-04-17 15:38:38 INFO PropertySample:144 - pubMessage success
在平台设备详情页面可以看到最新上报的属性值:
属性读写
调用客户端的setPropertyListener方法来设置属性回调接口。在PropertySample这个例子中,我们实现了属性读写接口。
写属性处理:实现了alarm属性的写操作,其他属性不支持写操作。
读属性处理:将本地属性值按照接口格式进行拼装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
device.getClient().setPropertyListener(new PropertyListener() { //处理写属性 @Override public void onPropertiesSet(String requestId, List<ServiceProperty> services) { // 遍历service for (ServiceProperty serviceProperty : services) { log.info("OnPropertiesSet, serviceId is {}", serviceProperty.getServiceId()); // 遍历属性 for (String name : serviceProperty.getProperties().keySet()) { log.info("property name is {}", name); log.info("set property value is {}", serviceProperty.getProperties().get(name)); } } // 修改本地的属性值 device.getClient().respondPropsSet(requestId, IotResult.SUCCESS); } /** * 处理读属性。多数场景下,用户可以直接从平台读设备影子,此接口不用实现。 * 但如果需要支持从设备实时读属性,则需要实现此接口。 */ @Override public void onPropertiesGet(String requestId, String serviceId) { log.info("OnPropertiesGet, the serviceId is {}", serviceId); Map<String, Object> json = new HashMap<>(); Random rand = new SecureRandom(); json.put("alarm", 1); json.put("temperature", rand.nextFloat() * 100.0f); json.put("humidity", rand.nextFloat() * 100.0f); json.put("smokeConcentration", rand.nextFloat() * 100.0f); ServiceProperty serviceProperty = new ServiceProperty(); serviceProperty.setProperties(json); serviceProperty.setServiceId("smokeDetector"); device.getClient().respondPropsGet(requestId, Arrays.asList(serviceProperty)); } }); |
注:
- 属性读写接口需要调用respondPropsGet和respondPropsSet接口来上报操作结果。
- 如果设备不支持平台主动到设备读,onPropertiesGet接口可以空实现
运行PropertySample类,然后在平台上设备影子页面查看当前alarm属性值为1:
我们把alarm属性修改为0:
查看设备侧日志,看到设备收到属性设置,alarm被修改为0:
命令下发
设置命令监听器用来接收平台下发的命令,在回调接口里,需要对命令进行处理,并上报响应。
在CommandSample例子中实现了命令的处理,收到命令后仅进行打印,然后调用respondCommand上报响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
device.getClient().setCommandListener(new CommandListener() { @Override public void onCommand(String requestId, String serviceId, String commandName, Map<String, Object> paras) { log.info("onCommand, serviceId = {}", serviceId); log.info("onCommand , name = {}", commandName); log.info("onCommand, paras = {}", paras.toString()); //处理命令 //发送命令响应 device.getClient().respondCommand(requestId, new CommandRsp(0)); } }); |
直接运行CommandSample类,然后在平台上下发命令,命令的serviceId填smokeDetector、命令名填ringAlarm、参数携带duration为整数20。
查看日志,看到设备收到命令并上报了响应:
面向物模型编程
前面介绍了直接调用设备客户端的接口和平台进行通讯的方法,这种方式比较灵活,但用户需要妥善处理每一个接口,实现比较复杂。
SDK提供了一种更简单的方式,即面向物模型编程。面向物模型编程指基于SDK提供的物模型抽象能力,设备代码按照物模型定义设备服务,然后可以直接访问设备服务(即调用设备服务的属性读写接口),SDK就能自动和平台通讯,完成属性的同步和命令的调用。
相比直接调用客户端接口和平台进行通讯,面向物模型编程更简单,它简化了设备侧代码的复杂度,让设备代码只需要关注业务,而不用关注和平台的通讯过程。这种方式适合多数场景。
SmokeDetector例子演示了如何面向物模型编程:
- 按照物模型定义服务类和服务的属性(如果有多个服务,则需要定义多个服务类):
1 2 3 4 5 6 7 8 9 10 11 12 13 14
public static class SmokeDetectorService extends AbstractService { //按照设备模型定义属性,注意属性的name和类型需要和模型一致,writeable表示属性知否可写,name指定属性名 @Property(name = "alarm", writeable = true) int smokeAlarm = 1; @Property(name = "smokeConcentration", writeable = false) float concentration = 0.0f; @Property(writeable = false) int humidity; @Property(writeable = false) float temperature;
用@Property注解来表示是一个属性,可以用name指定属性名,如果不指定则使用字段名。
属性可以加上writeable来控制权限,如果属性只读,则加上writeable = false,如果不加,默认认为可读写。
- 定义服务的命令。设备收到平台下发的命令时,SDK会自动调用这里定义的命令。
接口入参和返回值的类型是固定的不能修改,否则会出现运行时错误。
这里定义的是一个响铃报警命令,命令名为ringAlarm,下发参数为”duration”,表示响铃报警的持续时间。
1 2 3 4 5 6 7
//定义命令,注意接口入参和返回值类型是固定的不能修改,否则会出现运行时错误 @DeviceCommand(name = "ringAlarm") public CommandRsp alarm(Map<String, Object> paras) { int duration = (int) paras.get("duration"); log.info("ringAlarm duration = " + duration); return new CommandRsp(0); }
- 定义getter和setter接口
- 当设备收到平台下发的查询属性以及设备上报属性时,会自动调用getter方法。getter方法需要读取设备的属性值,可以实时到传感器读取或者读取本地的缓存
- 当设备收到平台下发的设置属性时,会自动调用setter方法。setter方法需要更新设备本地的值。如果属性不支持写操作,setter保留空实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
//setter和getter接口的命名应该符合java bean规范,sdk会自动调用这些接口 public int getHumidity() { //模拟从传感器读取数据 humidity = new Random().nextInt(100); return humidity; } public void setHumidity(int humidity) { //humidity是只读的,不需要实现 } public float getTemperature() { //模拟从传感器读取数据 temperature = new Random().nextInt(100); return temperature; } public void setTemperature(float temperature) { //只读字段不需要实现set接口 } public float getConcentration() { //模拟从传感器读取数据 concentration = new Random().nextFloat()*100.0f; return concentration; } public void setConcentration(float concentration) { //只读字段不需要实现set接口 } public int getSmokeAlarm() { return smokeAlarm; } public void setSmokeAlarm(int smokeAlarm) { this.smokeAlarm = smokeAlarm; if (smokeAlarm == 0){ log.info("alarm is cleared by app"); } }
- 在main函数中创建服务实例并添加到设备。
1 2 3 4 5 6 7 8 9 10
//创建设备 IoTDevice device = new IoTDevice(serverUri, deviceId, secret); //创建设备服务 SmokeDetectorService smokeDetectorService = new SmokeDetectorService(); device.addService("smokeDetector", smokeDetectorService); if (device.init() != 0) { return; }
- 开启周期上报:
1 2
//启动自动周期上报 smokeDetectorService.enableAutoReport(10000);
备注:如果不想周期上报,也可以调用firePropertiesChanged接口手工触发上报。
直接运行SmokeDetector类,查看日志在上报属性:
在平台侧查看设备影子:
图8 设备影子-查看alarm属性
在平台上修改属性alarm,查看设备日志收到属性设置:
在平台下发ringAlarm命令:
查看设备日志看到ringAlarm命令被调用,并且上报了响应:
使用代码生成器
sdk提供了设备代码生成器,用户只需要提供产品模型文件,就能自动生成设备代码框架。代码生成器可以解析设备模型文件,然后对模型里定义的每个服务,生成对应的service类,然后生成一个设备主类,在main函数中创建设备并注册设备服务实例。
使用代码生成器生成设备代码的步骤:
- 下载huaweicloud-iot-device-sdk-java工程,解压缩后进入huaweicloud-iot-device-sdk-java目录执行“mvn install”。
- 执行完成会在iot-device-code-generator的target下生成可执行jar包。
- 将产品模型文件保存到本地,比如我的模型文件“smokeDetector.zip”放到D盘。
- 访问SDK根目录,执行“java -jar .\iot-device-code-generator\target\iot-device-code-generator-1.2.0-with-deps.jar D:\smokeDetector.zip”。
- 在huaweicloud-iot-device-sdk-java目录下会生成generated-demo包。
至此,设备代码已经生成。
编译运行生成的代码:
- 访问“huaweicloud-iot-device-sdk-java\generated-demo”,执行“mvn install”,在target下生成jar包。
- 执行java -jar .\target\iot-device-demo-ganerated-1.2.0-with-deps.jar ssl://域名信息:8883 device_id secret,三个参数分别为设备接入地址、设备id和密码,运行生成的demo。
D:\git\huaweicloud-iot-device-sdk-java\generated-demo>java -jar .\target\iot-device-demo-ganerated-1.2.0-with-deps.jar ssl://域名信息:8883 5e06bfee334dd4f33759f5b3_demo secret 2024-04-17 15:50:53 INFO AbstractService:73 - create device, the deviceId is 5e06bfee334dd4f33759f5b3_demo 2024-04-17 15:50:54 INFO MqttConnection:204 - try to connect to ssl://域名信息:8883 2024-04-17 15:50:55 INFO MqttConnection:228 - connect success, the uri is ssl://域名信息:8883 2024-04-17 15:50:55 INFO MqttConnection:268 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect success","timestamp":"1713340255148"},"service_id":"$log","event_type":"log_report","event_time":"20240417T075055Z","event_id":null}]} 2024-04-17 15:50:55 INFO MqttConnection:111 - Mqtt client connected. address is ssl://域名信息:8883 2024-04-17 15:50:55 INFO MqttConnection:268 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"device_sdk_version":"JAVA_v1.2.0","fw_version":null,"sw_version":null},"service_id":"$sdk_info","event_type":"sdk_info_report","event_time":"20240417T075055Z","event_id":null}]} 2024-04-17 15:50:55 INFO MqttConnection:268 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/events/up, msg = {"object_device_id":"5e06bfee334dd4f33759f5b3_demo","services":[{"paras":{"type":"DEVICE_STATUS","content":"connect complete, the url is ssl://域名信息:8883","timestamp":"1713340255496"},"service_id":"$log","event_type":"log_report","event_time":"20240417T075055Z","event_id":null}]} 2024-04-17 15:51:03 INFO smokeDetectorService:78 - report property alarm value = 50 2024-04-17 15:51:03 INFO smokeDetectorService:104 - report property temperature value = 0.3648571367849047 2024-04-17 15:51:03 INFO smokeDetectorService:91 - report property smokeConcentration value = 0.679772877336927 2024-04-17 15:51:03 INFO smokeDetectorService:117 - report property humidity value = 15 2024-04-17 15:51:03 INFO MqttConnection:268 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/properties/report, msg = {"services":[{"properties":{"alarm":50,"temperature":0.3648571367849047,"smokeConcentration":0.679772877336927,"humidity":15},"service_id":"smokeDetector","event_time":"20240417T075103Z"}]}
修改扩展生成的代码:
生成的代码已经完成了服务的定义和注册,用户只需要进行少量的修改即可。
如何开发网关
网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。
- 使用AbstractGateway类
继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口:
1 2 3 4 5 6 7
public abstract void onSubdevCommand(String requestId, Command command); public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet); public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet); public abstract void onSubdevMessage(DeviceMessage message);
- iot-gateway-demo代码介绍
工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类:
SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发
StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息
SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存
Session:设备会话类,保存了设备id和TCP的channel的对应关系
- SimpleGateway类
添加或删除子设备处理
添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口
删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@Override public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) { for (DeviceInfo subdevice : subDevicesInfo.getDevices()) { Session session = nodeIdToSesseionMap.get(subdevice.getNodeId()); if (session != null) { if (session.getChannel() != null) { session.getChannel().close(); channelIdToSessionMap.remove(session.getChannel().id().asLongText()); nodeIdToSesseionMap.remove(session.getNodeId()); } } } return super.onDeleteSubDevices(subDevicesInfo); }
- 下行消息处理
网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。
- 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Override public void onSubdevMessage(DeviceMessage message) { //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即 //deviceId = productId_nodeId String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId()); if (nodeId == null) { return; } //通过nodeId获取session,进一步获取channel Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { log.error("subdev is not connected " + nodeId); return; } if (session.getChannel() == null){ log.error("channel is null " + nodeId); return; } //直接把消息转发给子设备 session.getChannel().writeAndFlush(message.getContent()); log.info("writeAndFlush " + message); }
- 属性读写:
属性读写包括属性设置和属性查询。
属性设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
@Override public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) { if (propsSet.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet)); //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应 getClient().respondPropsSet(requestId, IotResult.SUCCESS); log.info("writeAndFlush " + propsSet); }
属性查询:1 2 3 4 5 6 7
@Override public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) { //不建议平台直接读子设备的属性,这里直接返回失败 log.error("not supporte onSubdevPropertiesGet"); deviceClient.respondPropsSet(requestId, IotResult.FAIL); }
- 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
@Override public void onSubdevCommand(String requestId, Command command) { if (command.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command)); //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应 getClient().respondCommand(requestId, new CommandRsp(0)); log.info("writeAndFlush " + command); }
- 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
- 上行消息处理
上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话:
如果子设备信息不存在,这里会创建会话失败,直接拒绝连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s); //如果是首条消息,创建session //如果是首条消息,创建session Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText()); if (session == null) { String nodeId = s; session = simpleGateway.createSession(nodeId, incoming); //创建会话失败,拒绝连接 if (session == null) { log.info("close channel"); ctx.close(); } }
如果会话存在,则进行消息转发:
1 2 3 4 5 6 7
else { //如果需要上报属性则调用reportSubDeviceProperties DeviceMessage deviceMessage = new DeviceMessage(s); deviceMessage.setDeviceId(session.getDeviceId()); simpleGateway.reportSubDeviceMessage(deviceMessage, null); }
到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。
- iot-gateway-demo的使用
- 创建子设备的产品,步骤可参考创建产品。
- 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示
图9 模型定义-子设备产品
- 修改StringTcpServer的main函数,替换构造参数,然后运行该类。
1 2 3
simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(), "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret");
- 在平台上看到该网关在线后,添加子设备。
图10 设备-添加子设备
表1 子设备参数 参数名称
参数描述
所属产品
子设备所属的产品,选择步骤1创建的产品。
设备名称
即device_name,可自定义,如subdev_name
设备标识码
即node_id,填写subdev。
设备ID
即devicee_id,可不填写,自动生成。
此时网关上日志打印:
2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev
- 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。
图11 子设备连接
此时网关设备日志打印:
2024-04-16 21:00:54 INFO StringTcpServer:196 - initChannel: /127.0.0.1:21889 2024-04-16 21:01:00 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev 2024-04-16 21:01:00 INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'}
- 在平台上看到子设备上线。
图12 设备列表-设备在线
- 子设备上报消息
图13 子设备上报消息
查看日志看到上报成功
2024-04-16 21:02:36 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"] 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]]
- 查看消息跟踪
在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪:
图14 消息跟踪-直连设备