设备接入 IoTDA设备接入 IoTDA

计算
弹性云服务器 ECS
裸金属服务器 BMS
云手机 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器 HECS
VR云渲游平台 CVR
特惠算力专区
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属企业存储服务
云存储网关 CSG
专属分布式存储服务 DSS
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘小站 IES
智能边缘平台 IEF
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
园区智能体 CampusGo
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
视频分析服务 VAS
语音交互服务 SIS
知识图谱 KG
人证核身服务 IVS
IoT物联网
设备接入 IoTDA
设备管理 IoTDM(联通用户专用)
全球SIM联接 GSL
IoT数据分析
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
开发与运维
软件开发平台 DevCloud
项目管理 ProjectMan
代码托管 CodeHub
流水线 CloudPipeline
代码检查 CodeCheck
编译构建 CloudBuild
部署 CloudDeploy
云测 CloudTest
发布 CloudRelease
移动应用测试 MobileAPPTest
CloudIDE
Classroom
开源镜像站 Mirrors
应用魔方 AppCube
云性能测试服务 CPTS
应用管理与运维平台 ServiceStage
云应用引擎 CAE
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
资源管理服务 RMS
应用身份管理服务 OneAccess
区块链
区块链服务 BCS
可信跨链服务 TCS
可信分布式身份服务
智能协作
IdeaHub
开发者工具
SDK开发指南
API签名指南
DevStar
HCloud CLI
Terraform
Ansible
云生态
云市场
合作伙伴中心
华为云培训中心
其他
管理控制台
消息中心
产品价格详情
系统权限
我的凭证
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
应用编排服务 AOS
多云容器平台 MCP
基因容器 GCS
容器洞察引擎 CIE
云原生服务中心 OSC
容器批量计算 BCE
容器交付流水线 ContainerOps
应用服务网格 ASM
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB (for openGauss)
云数据库 GaussDB(for MySQL)
云数据库 GaussDB NoSQL
数据管理服务 DAS
数据库和应用迁移 UGO
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据湖治理中心 DGC
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
API网关 APIG
分布式缓存服务 DCS
分布式消息服务RocketMQ版
企业应用
域名注册服务 Domains
云解析服务 DNS
云速建站 CloudSite
网站备案
商标注册
华为云WeLink
会议
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMAExchange
API全生命周期管理 ROMA API
安全与合规
安全技术与应用
DDoS防护 ADS
Web应用防火墙 WAF
云防火墙 CFW
应用信任中心 ATC
企业主机安全 HSS
容器安全服务 CGS
云堡垒机 CBH
数据库安全服务 DBSS
数据加密服务 DEW
数据安全中心 DSC
云证书管理服务 CCM
SSL证书管理 SCM
漏洞扫描服务 VSS
态势感知 SA
威胁检测服务 MTD
管理检测与响应 MDR
安全治理云图 Compass
认证测试中心 CTC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
专属云
专属计算集群 DCC
解决方案
高性能计算 HPC
SAP
混合云灾备
华为工业云平台 IMC
价格
成本优化最佳实践
专属云商业逻辑
用户服务
帐号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
文档首页> 设备接入 IoTDA> SDK参考> 设备侧SDK> IoT Device SDK使用指南(Java)
更新时间:2021-12-24 GMT+08:00
分享

IoT Device SDK使用指南(Java)

准备工作

  • 开发环境要求:已经安装JDK(版本1.8以上)和maven
  • 访问SDK下载页面,下载SDK,整个工程包含以下子工程:

    iot-device-sdk-java:sdk代码

    iot-device-demo:普通直连设备的demo代码

    iot-gateway-demo:网关设备的demo代码

    iot-bridge-demo:网桥的demo代码,用来演示如何将tcp设备桥接到平台

    iot-device-code-generator:设备代码生成器,可以根据产品模型自动生成设备代码

  • 编译安装:进入到SDK根目录,执行mvn install

创建产品

为了方便体验,我们提供了一个烟感的产品模型,烟感会上报烟雾值、温度、湿度、烟雾报警、还支持响铃报警命令。以烟感例,体验消息上报、属性上报等功能。

  1. 访问设备接入服务,单击“立即使用”进入设备接入控制台,查看MQTTS设备接入域名,保存该地址。
  2. 单击左侧导航栏“产品”,单击页面右上角的“创建产品”
  3. 根据页面提示填写参数,然后单击“立即创建”。

    基本信息

    所属资源空间

    平台自动将新创建的产品归属在默认资源空间下。如需归属在其他资源空间下,下拉选择所属的资源空间。如无对应的资源空间,请先创建资源空间

    产品名称

    自定义。支持字母、数字、下划线(_)、连字符(-)的字符组合。

    协议类型

    选择“MQTT”

    数据格式

    选择“JSON”

    厂商名称

    自定义。支持中英文字符、数字、下划线(_)、连字符(-)的字符组合。

    模型定义

    选择模型

    此处不使用平台预置的产品模型,使用离线导入的产品模型,详细请参考上传产品模型

    所属行业

    选择产品模型的所属行业。

    设备类型

    自定义。

上传产品模型

  1. 单击下载烟感产品模型smokeDetector,获取产品模型文件。
  2. 找到步骤3创建的产品,单击产品进入产品详情页。
  3. 选择“模型定义”页签,单击“上传模型文件”,上传步骤1获取的产品模型文件。

注册设备

  1. 选择左侧导航栏“设备 > 所有设备”,单击页面右上角的“注册设备”
  2. 根据页面提示信息填写参数,然后单击“确定”

    参数名称

    说明

    所属资源空间

    确保和步骤3创建的产品归属在同一个资源空间。

    所属产品

    选择步骤3创建的产品。

    设备标识码

    即nodeID,设备唯一物理标识。可自定义,由英文字母和数字组成。

    设备名称

    即device_name,可自定义。

    设备认证类型

    选择“密钥”

    密钥

    设备密钥,可自定义。若不填写密钥,物联网平台会自动生成密钥。

    设备注册成功后保存设备标识码、设备ID、密钥。

设备初始化

  1. 创建设备时,需要写入在注册设备时获取的设备ID、密码,以及1中获取的设备对接信息,注意格式为ssl://域名信息:端口号
    1
    2
           IoTDevice device = new IoTDevice("ssl://iot-mqtts.cn-north-4.myhuaweicloud.com:8883",
                    "5e06bfee334dd4f33759f5b3_demo", "mysecret", file);
    
  2. 建立连接。调用init接口,该接口是阻塞调用,如果建立连接成功会返回0。
    1
    2
    3
          if (device.init() != 0) {
                return;
            }
    
    如果连接成功就会打印:
    1
    2019-12-26 11:02:02  INFO MqttConnection:88 - Mqtt client connected. address :ssl://iot-mqtts.cn-north-4.myhuaweicloud.com:8883
    
  3. 创建设备并连接成功后,可以开始使用设备进行通讯。调用IoT Device 的getClient接口获取设备客户端,客户端提供了消息、属性、命令等通讯接口。

消息上报

消息上报是指设备向平台上报消息。

  1. 从device中获取客户端,调用IoTDevice的getClient接口即可获取到客户端。
  2. 调用客户端的reportDeviceMessage接口来上报设备消息。在MessageSample这个例子中我们周期性上报消息:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
           while (true) {        
                   device.getClient().reportDeviceMessage(new DeviceMessage("hello"), new ActionListener() {
                    @Override
                    public void onSuccess(Object context) {
                    }
    
                    @Override
                    public void onFailure(Object context, Throwable var2) {
                        log.error("reportDeviceMessagefail: "+var2);
                    }
                });
    
                Thread.sleep(10000);
            }
    
  3. 修改MessageSample类的main函数,替换自己的设备参数后运行MessageSample类,查看日志打印看到连接成功和发送消息的打印:
    1
    2
    2019-12-26 11:02:02  INFO MqttConnection:88 - Mqtt client connected. address :ssl://iot-mqtts.cn-north-4.myhuaweicloud.com:8883
    2019-12-26 11:02:02  INFO MqttConnection:214 - publish message topic =  $oc/devices/test_testDevice/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":null}
    
  4. 在设备接入控制台,选择设备 > 所有设备-查看设备是否在线。

  5. 选择对应设备,单击“查看”,在设备详情页面启动设备消息跟踪。

  6. 平台收到了设备的消息。

:消息跟踪会有一定的延时,如果没有看到数据,请等待后刷新。

属性上报

打开PropertySample类,这个例子中会定时的上报alarm、temperature、humidity、smokeConcentration这四个属性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
      //定时上报属性
        while (true) {

            Map<String ,Object> json = new HashMap<>();
            Random rand = new Random();

            //按照物模型设置属性
            json.put("alarm", alarm);
            json.put("temperature", rand.nextFloat()*100.0f);
            json.put("humidity", rand.nextFloat()*100.0f);
            json.put("smokeConcentration", rand.nextFloat() * 100.0f);

            ServiceProperty serviceProperty = new ServiceProperty();
            serviceProperty.setProperties(json);
            serviceProperty.setServiceId("smokeDetector");//serviceId要和物模型一致

            
               device.getClient().reportProperties(Arrays.asList(serviceProperty), new ActionListener() {
                @Override
                public void onSuccess(Object context) {
                    log.info("pubMessage success" );
                }

                @Override
                public void onFailure(Object context, Throwable var2) {
                    log.error("reportProperties failed" + var2.toString());
                }
            });

            Thread.sleep(10000);
        }
    }

修改PropertySample的main函数后直接运行PropertySample类,查看日志看到发送成功的打印

在平台设备详情页面可以看到最新上报的属性值:

属性读写

调用客户端的setPropertyListener方法来设置属性回调接口。在PropertySample这个例子中,我们实现了属性读写接口。

写属性处理:实现了alarm属性的写操作,其他属性不支持写操作。

读属性处理:将本地属性值按照接口格式进行拼装。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
            device.getClient().setPropertyListener(new PropertyListener() {

            //处理写属性
            @Override
            public void onPropertiesSet(String requestId, List<ServiceProperty> services) {

                //遍历service
                for (ServiceProperty serviceProperty: services){

                    log.info("OnPropertiesSet, serviceId =  " + serviceProperty.getServiceId());

                    //遍历属性
                    for (String name :serviceProperty.getProperties().keySet()){
                        log.info("property name = "+ name);
                        log.info("set property value = "+ serviceProperty.getProperties().get(name));
                        if (name.equals("alarm")){
                            //修改本地值
                            alarm = (Integer) serviceProperty.getProperties().get(name);
                        }
                    }

                }
                //修改本地的属性值
                client.respondPropsSet(requestId, IotResult.SUCCESS);
            }

            //处理读属性
            @Override
            public void onPropertiesGet(String requestId, String serviceId) {

                log.info("OnPropertiesGet " + serviceId);
                Map<String ,Object> json = new HashMap<>();
                Random rand = new Random();
                json.put("alarm", alarm);
                json.put("temperature", rand.nextFloat()*100.0f);
                json.put("humidity", rand.nextFloat()*100.0f);
                json.put("smokeConcentration", rand.nextFloat() * 100.0f);

                ServiceProperty serviceProperty = new ServiceProperty();
                serviceProperty.setProperties(json);
                serviceProperty.setServiceId("smokeDetector");

                client.respondPropsGet(requestId, Arrays.asList(serviceProperty));
            }
        });

  1. 属性读写接口需要调用respondPropsGet和respondPropsSet接口来上报操作结果。
  2. 如果设备不支持平台主动到设备读,onPropertiesGet接口可以空实现

运行PropertySample类,然后在平台上设备影子页面查看当前alarm属性值为1:

我们把alarm属性修改为0:

查看设备侧日志,看到设备收到属性设置,alarm被修改为0:

命令下发

设置命令监听器用来接收平台下发的命令,在回调接口里,需要对命令进行处理,并上报响应。

在CommandSample例子中实现了命令的处理,收到命令后仅进行打印,然后调用respondCommand上报响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
	device.getClient().setCommandListener(new CommandListener() {
            @Override
            public void onCommand(String requestId, String serviceId, String commandName, Map<String, Object> paras) {
                log.info("onCommand, serviceId = " +serviceId);
                log.info("onCommand , name = " + commandName);
                log.info("onCommand, paras =  " + paras.toString());

                //处理命令

                //发送命令响应
                client.respondCommand(requestId, new CommandRsp(0));
            }
        });
		
		

直接运行CommandSample类,然后在平台上下发命令,命令的serviceId填smokeDetector、命令名填ringAlarm、参数携带duration为整数20。

查看日志,看到设备收到命令并上报了响应:

面向物模型编程

前面介绍了直接调用设备客户端的接口和平台进行通讯的方法,这种方式比较灵活,但用户需要妥善处理每一个接口,实现比较复杂。

SDK提供了一种更简单的方式,即面向物模型编程。面向物模型编程指基于SDK提供的物模型抽象能力,设备代码按照物模型定义设备服务,然后可以直接访问设备服务(即调用设备服务的属性读写接口),SDK就能自动和平台通讯,完成属性的同步和命令的调用。

相比直接调用客户端接口和平台进行通讯,面向物模型编程更简单,它简化了设备侧代码的复杂度,让设备代码只需要关注业务,而不用关注和平台的通讯过程。这种方式适合多数场景。

SmokeDetector例子演示了如何面向物模型编程:

  1. 按照物模型定义服务类和服务的属性(如果有多个服务,则需要定义多个服务类):
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    public static class SmokeDetectorService extends AbstractService {
    
            //按照设备模型定义属性,注意属性的name和类型需要和模型一致,writeable表示属性知否可写,name指定属性名
            @Property(name = "alarm", writeable = true)
            int smokeAlarm = 1;
    
            @Property(name = "smokeConcentration", writeable = false)
            float concentration = 0.0f;
    
            @Property(writeable = false)
            int humidity;
    
            @Property(writeable = false)
            float temperature;
    

    用@Property注解来表示是一个属性,可以用name指定属性名,如果不指定则使用字段名。

    属性可以加上writeable来控制权限,如果属性只读,则加上writeable = false,如果不加,默认认为可读写。

  2. 定义服务的命令。设备收到平台下发的命令时,SDK会自动调用这里定义的命令。

    接口入参和返回值的类型是固定的不能修改,否则会出现运行时错误。

    这里定义的是一个响铃报警命令,命令名为ringAlarm。

    1
    2
    3
    4
    5
    6
    7
    //定义命令,注意接口入参和返回值类型是固定的不能修改,否则会出现运行时错误
            @DeviceCommand(name = "ringAlarm")
            public CommandRsp alarm(Map<String, Object> paras) {
                int duration = (int) paras.get("duration");
                log.info("ringAlarm  duration = " + duration);
                return new CommandRsp(0);
            }
    
  3. 定义getter和setter接口
    • 当设备收到平台下发的查询属性以及设备上报属性时,会自动调用getter方法。getter方法需要读取设备的属性值,可以实时到传感器读取或者读取本地的缓存
    • 当设备收到平台下发的设置属性时,会自动调用setter方法。setter方法需要更新设备本地的值。如果属性不支持写操作,setter保留空实现。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
     //setter和getter接口的命名应该符合java bean规范,sdk会自动调用这些接口
            public int getHumidity() {
    
                //模拟从传感器读取数据
                humidity = new Random().nextInt(100);
                return humidity;
            }
    
            public void setHumidity(int humidity) {
                //humidity是只读的,不需要实现
            }
    
            public float getTemperature() {
    
                //模拟从传感器读取数据
                temperature = new Random().nextInt(100);
                return temperature;
            }
    
            public void setTemperature(float temperature) {
                //只读字段不需要实现set接口
            }
    
            public float getConcentration() {
    
                //模拟从传感器读取数据
                concentration = new Random().nextFloat()*100.0f;
                return concentration;
            }
    
            public void setConcentration(float concentration) {
                //只读字段不需要实现set接口
            }
    
            public int getSmokeAlarm() {
                return smokeAlarm;
            }
    
            public void setSmokeAlarm(int smokeAlarm) {
    
                this.smokeAlarm = smokeAlarm;
                if (smokeAlarm == 0){
                    log.info("alarm is cleared by app");
                }
            }
    
  4. 在main函数中创建服务实例并添加到设备。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
          //创建设备
            IoTDevice device = new IoTDevice(serverUri, deviceId, secret);
    
            //创建设备服务
            SmokeDetectorService smokeDetectorService = new SmokeDetectorService();
            device.addService("smokeDetector", smokeDetectorService);
    
            if (device.init() != 0) {
                return;
            }
    
  5. 开启周期上报:
    1
    2
    //启动自动周期上报
    smokeDetectorService.enableAutoReport(10000);
    

    备注:如果不想周期上报,也可以调用firePropertiesChanged接口手工触发上报。

    直接运行SmokeDetector类,查看日志在上报属性:

    在平台侧查看设备影子:

    在平台上修改属性alarm,查看设备日志收到属性设置:

    在平台下发ringAlarm命令:

    查看设备日志看到ringAlarm命令被调用,并且上报了响应:

使用代码生成器

sdk提供了设备代码生成器,用户只需要提供产品模型文件,就能自动生成设备代码框架。代码生成器可以解析设备模型文件,然后对模型里定义的每个服务,生成对应的service类,然后生成一个设备主类,在main函数中创建设备并注册设备服务实例。

使用代码生成器生成设备代码的步骤:

  1. 下载huaweicloud-iot-device-sdk-java工程,解压缩后进入huaweicloud-iot-device-sdk-java目录执行“mvn install”
  2. 执行完成会在iot-device-code-generator的target下生成可执行jar包。

  3. 将产品模型文件保存到本地,比如我的模型文件“smokeDetector_cb097d20d77b4240adf1f33d36b3c278_smokeDetector.zip”放到D盘。
  4. 访问 iot-device-code-generator\target\目录,执行“java -jar iot-device-code-generator-0.2.0-with-deps.jar D:\smokeDetector_cb097d20d77b4240adf1f33d36b3c278_smokeDetector.zip”

  5. 在huaweicloud-iot-device-sdk-java目录下会生成generated-demo包。

    至此,设备代码已经生成。

编译运行生成的代码:

  1. 访问“huaweicloud-iot-device-sdk-java\generated-demo”,执行“mvn install”,在target下生成jar包。

  2. 执行java -jar target\iot-device-demo-ganerated-0.2.0-with-deps.jar 5e06bfee334dd4f33759f5b3_demo *****

    两个参数分别为设备id和密码,运行生成的demo

修改扩展生成的代码:

生成的代码已经完成了服务的定义和注册,用户只需要进行少量的修改即可。

  1. 命令接口,需要添加具体的实现逻辑

  2. getter方法,生成的代码是返回随机值,需要改为从传感器读取数据。
  3. setter方法,生成的代码只完成了属性的修改保存,还需要添加真实的逻辑处理,比如向传感器下发指令。

如何开发网关

网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。

  • 使用AbstractGateway类

    继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口:

    1
    2
    3
    4
    5
    6
    7
        public abstract void onSubdevCommand(String requestId, Command command);
    
        public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet);
    
        public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet);
    
        public abstract void onSubdevMessage(DeviceMessage message);
    
  • iot-gateway-demo代码介绍

    工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类:

    SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发

    StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息

    SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存

    Session:设备会话类,保存了设备id和TCP的channel的对应关系

  • SimpleGateway类

    添加或删除子设备处理

    添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口

    删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     
     
     @Override
        public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) {
    
            for (DeviceInfo subdevice : subDevicesInfo.getDevices()) {
                Session session = nodeIdToSesseionMap.get(subdevice.getNodeId());
                if (session != null) {
                    if (session.getChannel() != null) {
                        session.getChannel().close();
                        channelIdToSessionMap.remove(session.getChannel().id().asLongText());
                        nodeIdToSesseionMap.remove(session.getNodeId());
                    }
                }
            }
            return super.onDeleteSubDevices(subDevicesInfo);
    
        }
    
  • 下行消息处理
    网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。
    • 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
       @Override
          public void onSubdevMessage(DeviceMessage message) {
      
              //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即
              //deviceId = productId_nodeId
              String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              //通过nodeId获取session,进一步获取channel
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  log.error("subdev is not connected " + nodeId);
                  return;
              }
              if (session.getChannel() == null){
                  log.error("channel is null " + nodeId);
                  return;
              }
      
              //直接把消息转发给子设备
              session.getChannel().writeAndFlush(message.getContent());
              log.info("writeAndFlush " + message);
          }
      
    • 属性读写:

      属性读写包括属性设置和属性查询。

      属性设置:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
       @Override
          public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) {
      
              if (propsSet.getDeviceId() == null) {
                  return;
              }
      
              String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  return;
              }
      
              //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
              session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet));
      
              //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应
              getClient().respondPropsSet(requestId, IotResult.SUCCESS);
      
              log.info("writeAndFlush " + propsSet);
      
          }
      
      属性查询:
      1
      2
      3
      4
      5
      6
      7
       @Override
          public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) {
      
              //不建议平台直接读子设备的属性,这里直接返回失败
              log.error("not supporte onSubdevPropertiesGet");
              deviceClient.respondPropsSet(requestId, IotResult.FAIL);
          }
      

    • 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      @Override
          public void onSubdevCommand(String requestId, Command command) {
      
              if (command.getDeviceId() == null) {
                  return;
              }
      
              String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId());
              if (nodeId == null) {
                  return;
              }
      
              Session session = nodeIdToSesseionMap.get(nodeId);
              if (session == null) {
                  return;
              }
      
              //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
              session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command));
      
              //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应
              getClient().respondCommand(requestId, new CommandRsp(0));
              log.info("writeAndFlush " + command);
          }
      

  • 上行消息处理

    上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话:

    如果子设备信息不存在,这里会创建会话失败,直接拒绝连接

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     @Override
            protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
                Channel incoming = ctx.channel();
                log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s);
    
                //如果是首条消息,创建session
    //如果是首条消息,创建session
                Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText());
                if (session == null) {
                    String nodeId = s;
                    session = simpleGateway.createSession(nodeId, incoming);
    
                    //创建会话失败,拒绝连接
                    if (session == null) {
                        log.info("close channel");
                        ctx.close();
                    }
                } 
    

    如果会话存在,则进行消息转发:

    1
    2
    3
    4
    5
    6
    7
    else {
                    //如果需要上报属性则调用reportSubDeviceProperties
                    DeviceMessage deviceMessage = new DeviceMessage(s);
                    deviceMessage.setDeviceId(session.getDeviceId());
                    simpleGateway.reportSubDeviceMessage(deviceMessage, null);
    
                }	
    

    到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。

  • iot-gateway-demo的使用
    1. 在平台上为网关注册开户。
    2. 修改StringTcpServer的main函数,替换构造参数,然后运行该类。
      1
      2
      3
       simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(),
                      "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883",
                      "5e06bfee334dd4f33759f5b3_demo", "mysecret");
      
    3. 在平台上看到该网关在线后,添加子设备。

      此时网关上日志打印:

      2020-02-11 09:42:17 INFO SubDevicesFilePersistence:75 - add subdev: ffff

    4. 运行TcpDevice类,建立连接后,输入子设备的nodeId。

    5. 在平台上看到子设备上线。

    6. 子设备上报消息

      查看日志看到上报成功

    7. 查看消息跟踪

      在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪:

分享:

    相关文档

    相关产品

关闭导读