文档首页/ 设备接入 IoTDA/ 最佳实践/ 数据转发/ IoTDA结合ModelArts实现预测分析
更新时间:2025-07-10 GMT+08:00
分享

IoTDA结合ModelArts实现预测分析

概述

在物联网解决方案中,针对庞大的数据进行自动学习时,需要对海量数据进行标注、训练,按照传统的方式进行标注、训练不仅耗时耗力,而且对资源消耗也非常巨大。华为云物联网平台可以通过规则引擎,将获取的海量数据流转到华为云其他云服务,从而实现将海量数据通过函数工作流(FunctionGraph)进行预处理,再将清洗后的数据流入AI开发平台(ModelArts)进行AI分析,并将分析结果统一转发至HTTP服务器中。

前提条件

  • 已注册华为官方账号。未注册可参考注册华为账户完成注册。
  • 已完成实名制认证。未完成可在华为云上单击实名认证完成认证,否则会影响后续云服务的开通。
  • 已开通设备接入服务。未开通则访问设备接入服务,单击“免费试用”或单击“价格计算器”购买并开通该服务。
  • 已开通FunctionGraph服务。未开通则访问FunctionGraph服务,单击“立即使用”后开通该服务。
  • 已开通ModelArts服务。未开通则访问AI开发平台,单击“控制台”后进入该服务。
  • 自建一个HTTP服务器,并提供POST接口用来接收推送的数据(本示例默认用户已经搭建好相应的服务器与接口,不再展示如何搭建HTTP服务器指导)。

示例场景

在本示例中,我们实现以下场景:

设备上报银行客户特征信息,物联网平台将数据转发至FunctionGraph,由FunctionGraph转发至ModelArts进行AI分析,最终将分析的结果转发至HTTP服务器中。

操作步骤如下:

  1. 创建并发布ModelArts模型
  2. 配置FunctionGraph函数
  3. 创建MQTT协议产品,并创建设备
  4. 数据转发规则配置,将数据流转至FunctionGraph。
  5. 模拟数据上报及结果验证,查看HTTP服务器是否收到AI分析后的消息。
图1 场景说明

配置ModelArts模型

  1. 下载ModelArts-Lab工程,在\ModelArts-Lab-master\official_examples\Using_ModelArts_to_Create_a_Bank_Marketing_Application\data”目录下获取训练数据文件“train.csv”。该训练数据主要展示银行中的一种常见业务:根据客户特征(年龄、工作类型、婚姻状况、文化程度、是否有房贷和是否有个人贷款),预测客户是否愿意办理定期存款业务。
  2. 进入OBS控制台,单击右上角“创建桶”创建一个新的桶。并在桶内创建两个文件夹,如dataSource和output,分别存放数据源和modelArts的输出。

    图2 OBS-新建文件夹

  3. 进入dataSource文件夹中,单击“上传对象”将训练数据“train.csv”存放在OBS中,供创建数据集使用。

    图3 OBS-上传训练数据

  4. 登录华为云官方网站,访问AI开发平台,单击“控制台”,进入ModelArts服务。
  5. 如未授权ModelArts使用依赖的服务,可按照页面提示先进行授权操作,如已经授权可直接进行下一步。

    图4 ModelArts-授权
    图5 ModelArts-添加授权

  6. 选择左侧导航栏“资产管理>数据集”,进入数据集页面,并单击左上角“创建数据集”,按照如下方式创建数据集。

    图6 ModelArts-创建数据集

  7. 选择左侧导航栏“开发生产>开发空间>自动学习>预测分析>创建项目”,进入创建预测分析界面。

    图7 ModelArts-预测分析

  8. 选择6中创建的数据集,标签列(数据中预测结果的列,本示例中为str7),输出路径和训练规格后,单击“创建项目”。

    图8 ModelArts-创建预测分析

  9. 创建项目需要等待约20分钟,当执行到服务部署时,选择资源池、AI应用及版本后,单击“继续运行”。

    图9 ModelArts-服务部署

  10. 等部署完成之后,选择左侧导航栏“模型部署 > 在线服务”,进入在线服务页面中选择部署的服务, 单击“修改”,进入修改服务页面,打开APP认证进行授权配置,完成后单击“下一步”并提交。

    图10 ModelArts-APP认证

  11. 单击“模型部署>在线服务”,单击进入已部署的服务,选择“预测”,复制以下数据到预测代码中后,单击“预测”后可查看返回结果,结果中的predict为no则表示用户不会办理存款。

    {
      "data": 
      {
        "count": 1,
        "req_data": 
    	[
          {
            "str1": "34",
            "str2": "blue-collar",
            "str3": "single",
            "str4": "tertiary",
            "str5": "no",
            "str6": "no"
          }
        ]
      }
    }
    图11 ModelArts-预测结果

  12. 更多详细关于Modelarts的说明可以参考ModelArts相关文档

配置FunctionGraph函数

  1. 参考数据转发至FunctionGraph函数工作流进行函数工作流配置。本示例中由于需要使用ModelArts相关配置参数,可按照如下伪代码的方式,添加配置项并访问ModelArts预测接口,body体结构参考11

    完成代码编写后可参考打包工程进行打包。
    依赖版本:
    commons-codec:commons-codec:1.15
    commons-logging:commons-logging:1.2
    com.google.code.gson:gson:2.8.6
    org.apache.httpcomponents:httpclient:4.5.13
    org.apache.httpcomponents:httpcore:4.4.13
    com.huawei.apigateway:java-sdk-core:3.2.3
    com.huawei.serverless:JavaRuntime:1.0.0
    org.slf4j:slf4j-api:1.7.36
    函数:
    public String funTest(String param, Context context) {
            String response = "err";
            try {
                //1.打印函数接收到的消息。使用版本为com.huawei.serverless:JavaRuntime:1.0.0
                log = context.getLogger();
                log.log("receive data: " + param);
    	    //初始化gson。 版本为com.google.code.gson:gson:2.8.6
    	    Gson gson = new Gson();
    	    //初始化httpClient,版本为org.apache.httpcomponents:httpclient:4.5.13
    	   CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(5000).build()).setDefaultRequestConfig(
    				RequestConfig.custom().setConnectTimeout(3000).setSocketTimeout(5000).build()).build();
    
                //2.获取ModelArts预测链接. 用来拼装请求URL
                String forecastServerAddress = context.getUserData(FORECAST_SERVER_ADDRESS);
                log.log("forecastServerAddress: " + forecastServerAddress);
                //3.获取ModelArts中的AK/APP_KEY
                String ak = context.getUserData(ACCESS_KEY);
                //4.获取ModelArts中的SK/APP_SECRET
                String sk = context.getUserData(ACCESS_SECRET);
    	    //5.获取将预测结果推送的目的地址
                String naServerAddress = context.getUserData(NA_MOCK_SERVER_ADDRESS);
                log.log("naServerAddress: " + naServerAddress);
    	    //6.使用gson将入参转换成设备消息上报推送的结构体DeviceMessage。推送结构体格式可参考设备消息上报推送
                DeviceMessage deviceMessage = gson.fromJson(param, DeviceMessage.class);
    	    //7.将收到的设备消息转换成ModelArts预测通知的结构体ForecastData,该结构体客户可根据ModelArts要求定义并实现convertForcastData(deviceMessage)方法进行自定义转换
                ForecastData forecastData = convertForcastData(deviceMessage);
                log.log("forcast dto is : " + forecastDto);
                //8.执行请求
    	    //9.使用com.huawei.apigateway:java-sdk-core:3.2.3版本的request函数构造请求参数
    	    Request request = new Request();
    	    request.setUrl(forecastServerAddress);
    	    request.setMethod(HttpMethodName.POST.name());
    	    request.setAppKey(ak);
    	    request.setAppSecrect(sk);
    	    request.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
    	    //10.将ModelArts预测结构体通过gson转换成json当成请求的body体
    	    String body = gson.toJson(forecastData);
    	    request.setBody(body);
    	    //11.通过com.huawei.apigateway:java-sdk-core:3.2.3版本中的Singer类对请求进行签名
    	    Signer signer = new Signer();
    	    signer.sign(request);
    	    //12.使用org.apache.httpcomponents:httpclient:4.5.13版本发送请求至ModelArts
    	    Map<String, String> headers = request.getHeaders();
    	    HttpPost httpPost = new HttpPost(forecastServerAddress);
    	    headers.forEach(httpPost::setHeader);
    	    httpPost.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
    	    CloseableHttpResponse resp = httpClient.execute(httpPost);
    	    //13.将http response中的响应转换成String。 用户可自定义转换方法toString(resp)
                String modelArtsResp = toString(resp);
    	    //14.使用gson将响应转换成ModelArts响应结构体ForecastResp,该结构体格式客户可根据实际响应内容进行定义
                ForecastResp forecastResp = gson.fromJson(modelArtsResp, ForecastResp.class);
    	    //15.客户可自定义constructPreResult()方法将forecastResp和deviceMessage中的消息内容组装成Result推送给http服务器,Result格式客户可根据实际需求定义。
                Result result = constructPreResult(deviceMessage, forecastResp);
    	    //16.使用gson将result转成json当成请求body
    	    String resultBody = gson.toJson(result);
    	    //17.使用org.apache.httpcomponents:httpclient:4.5.13版本发送请求到http服务器
                HttpPost httpPost = new HttpPost(naServerAddress);
    	    httpPost.setHeader("Content-Type", "application/json");
    	    httpPost.setEntity(new StringEntity(resultBody, ContentType.APPLICATION_JSON));
    	    CloseableHttpResponse naResponse = httpClient.execute(httpPost);
                //18.将http response中的响应转换成String。用户可自定义转换方法toString(naResponse)
    	    String response = toString(naResponse);
    	    log.log("response content is: + " + response);
            } catch (Exception e) {
                //打印异常
                log.log(e.getMessage());
            }
            return response;
    }

  2. 登录函数工作流控制台在左侧导航栏选择“函数 > 函数列表”,进入函数列表界面,单击右上角“创建函数”。
  3. 按照下图所示填写函数配置信息,其中,运行时语言选择“Java 8”

    图12 创建函数-ModelArts函数信息

  4. 进入已创建好的函数工作流中,单击“上传代码-JAR文件”,将1中的代码生成的JAR包上传至函数中。

    图13 创建函数-上传JAR包

  5. 单击“设置>常规设置”,修改函数执行入口,设置为代码的执行入口函数路径。

    图14 创建函数-修改函数执行入口

  6. 在同一页面,单击左侧菜单“环境变量”,设置环境变量信息如下。

    表1 环境变量说明

    环境变量

    说明

    FORECAST_SERVER_ADDRESS

    访问ModelArts服务,单击“模型部署>在线服务”,进入部署服务后单击“调用指南”,选择支持App认证方式的API接口公网地址。见图13 ModelArts-查看调用信息

    ACCESS_KEY

    同上,选择AppKey。

    ACCESS_SECRET

    同上,选择AppSecret。

    NA_MOCK_SERVER_ADDRESS

    将预测结果推送至服务器的地址(本示例不再提供HTTP服务器搭建指导)。

    图15 函数设置-环境变量
    图16 ModelArts-查看调用信息

  7. 单击“代码>配置测试事件>创建新的测试事件>空白模板”。内容示例如下:

    {
        "resource": "device.message",
        "event": "report",
        "event_time": "20231227T082702Z",
        "event_time_ms": "2023-12-27T08:27:02.944Z",
        "request_id": "********",
        "notify_data": {
            "header": {
                "app_id": "********e",
                "device_id": "********",
                "node_id": "********",
                "product_id": "********",
                "gateway_id": "********"
            },
            "body": {
                "topic": "$oc/devices/********/sys/messages/up",
                "content": {
                    "age": "34",
                    "profession": "blue-collar",
                    "maritalStatus": "single",
                    "educationalStatus": "tertiary",
                    "realEstateSituation": "no",
                    "loanStatus": "tertiary"
                }
            }
        }
    }
    图17 函数代码-配置测试事件

  8. 配置完测试事件后,单击“测试”,执行结果返回success(以实际函数返回结果为准),则表示成功。同时配置的HTTP服务器则能收到对应的预测结果。

    图18 函数测试-测试结果
    图19 HTTP服务器接收到预测结果

创建产品和设备

  1. 访问设备接入服务,单击“管理控制台”进入设备接入控制台。
  2. 选择左侧导航栏的“产品”,单击“创建产品”,创建一个基于MQTT协议的产品,填写参数后,单击“确定”

    图20 创建产品-MQTT

  3. 在该产品下注册设备,请参考注册单个设备

数据转发规则配置

  1. 选择左侧导航栏的“规则>数据转发”,单击“创建规则”。
  2. 参考下表参数说明,填写规则内容。以下参数取值仅为示例,您可参考用户指南创建自己的规则,填写完成后单击“创建规则”。

    图21 新建消息上报流转规则-数据转发至FunctionGraph
    表2 参数说明

    参数名

    参数说明

    规则名称

    自定义,如iotda-functiongraph。

    规则描述

    自定义,如数据转发至FunctionGraph服务。

    数据来源

    选择“设备消息”。

    触发事件

    自动匹配“设备消息上报”。

    资源空间

    和上一步创建的产品所在的资源空间保持一致。

    数据过滤语句

    通过编写SQL来解析和处理上报的JSON数据。

  3. 单击“设置转发目标”页签,单击“添加”,设置转发目标。

    图22 新建转发目标-转发至FunctionGraph

    参考下表参数说明,填写转发目标。填写完成后单击“确定”。

    表3 参数说明

    参数名

    参数说明

    转发目标

    选择“函数工作流(FunctionGraph)”

    区域

    选择“函数工作流”区域。

    目标函数

    选择已配置的函数工作流。

  4. 单击“启动规则”,激活配置好的数据转发规则。

    图23 启动规则-消息上报-转发至FunctionGraph

模拟数据上报及结果验证

  1. 在设备接入控制台“设备-所有设备-设备列表”中,单击创建的设备标识码进入设备详情,在“消息跟踪”页签中,单击“启动消息跟踪”,开启消息跟踪。

    图24 消息跟踪-启动消息跟踪

  2. 使用MQTT模拟器连接到平台(模拟器使用请参考:使用MQTT.fx进行消息收发)。
  3. 使用模拟器进行消息上报,详情请参考:设备消息上报

    Topic: $oc/devices/{device_id}/sys/messages/up
    数据格式:
    {
        "age": "34",
        "profession": "blue-collar",
        "maritalStatus": "single",
        "educationalStatus": "tertiary",
        "realEstateSituation": "no",
        "loanStatus": "tertiary"
    }

  4. 在设备详情中,查看消息跟踪,是否收到消息上报,并转发至FunctionGraph。

    图25 消息跟踪-消息上报流转至FunctionGraph

  5. 查看HTTP服务器是否收到预测结果。

    图26 查看消息

相关文档