更新时间:2022-07-05 GMT+08:00
代码解析
开发自定义驱动,进行OT数采。
public class DcDriver implements PointsCallback, ModuleShadowNotificationCallback { /** * 数采应用客户端,与边缘Hub建立MQTT连接 */ private DcClient dcClient; @PostConstruct void init() throws Exception { //打开客户端 dcClient = DcClient.createFromEnv(); dcClient.open(); //设置回调,并同步模块影子 dcClient.setPointsCallback(this); dcClient.startModuleShadow(this); } /** * 收到模块下行数采配置,消息需要缓存或持久化 *进入边缘节点详情-》应用模块-》数采配置-》下发按钮 */ @Override public void onModuleShadowReceived(ModuleShadowNotification shadow) { BriefModuleShadowDTO briefModuleShadowDTO = JacksonUtil.json2Pojo( JacksonUtil.pojo2Json(shadow.getProperties()), BriefModuleShadowDTO.class); connectDatasource(briefModuleShadowDTO.getConnectionInfo()); collectData(briefModuleShadowDTO.getPoints()); } private void collectData(Map<String, Object> points) { Map<String, Object> returnValues = new HashMap<>(); points.forEach((k, v) -> { PointConfig pointConfig = JacksonUtil.json2Pojo(JacksonUtil.pojo2Json(points), PointConfig.class); Object value = collectPointDataFromOpcuaServer(pointConfig); returnValues.put(pointConfig.getPointId(), value); }); //数据采集后上报数据 pointsReport(returnValues); } /** * 上报服务器采集点位数据到EdgeHub */ private void pointsReport(Map<String, Object> points) { PointsReport report = new PointsReport(); report.setPoints(points); try { dcClient.pointReport(report); } catch (JsonException e) { e.printStackTrace(); } } /** * 根据根据opcua的点位信息从opcua服务器读取或者订阅数据 */ private Object collectPointDataFromOpcuaServer(PointConfig pointConfig) { //todo 伙伴根据address和周期读取点位数据 //示例从服务器读取到点位数据为10 Object returnValue = 10; return returnValue; } /** * 根据数采配置的数据源连接参数完成数据源连接 */ private void connectDatasource(Map<String, String> connectionInfo) { //opcua 服务器为示例,获取服务器连接地址 String endpoint = connectionInfo.get("endpoint"); //todo 伙伴根据endpoint实现连接数据源动作 } /** * 收到点位设置的处理 */ @Override public PointsSetRsp onPointSet(String requestId, PointsSetReq pointsSetReq) { //PointsSetReq结构points为{pointId:value}的键值对, // 伙伴需要根据onModuleShadowReceived获取的数采配置实现写opcua服务器 //正常写数据到opcua服务器响应示例 return new PointsSetRsp(0,"success"); } /** * 收到点位读取的处理 */ @Override public PointsGetRsp onPointGet(String requestId, PointsGetReq pointsGetReq) { //PointsSetReq结构points为[pointId1,pointId2]的列表, // 伙伴需要根据onModuleShadowReceived获取的数采配置实现读取opcua服务器的点位信息 //正常从opcua服务器读取点位数据响应示例 PointsGetRsp rsp = new PointsGetRsp(); Map<String, Object> points = new HashMap<>(); for (String point : pointsGetReq.getPoints()) { points.put(point, 1); } return rsp; } }
下发配置对象
public class BriefModuleShadowDTO { //数据源id @JsonProperty("ds_id") private String dsId; //数采模板默认参数 @JsonProperty("default_values") private Map<String, String> defaultValues; //数据源附加参数 @JsonProperty("collection_paras") private Map<String, Integer> collectionParas; //数据源连接信息 @JsonProperty("connection_info") private Map<String, String> connectionInfo; //点位信息 private Map<String, Object> points; }
点位信息对象
public class PointConfig { //点位id @JsonProperty("point_id") private String pointId; //点位地址,opcua地址:address = "ns=3;i=1002" private String address; //数据类型int、int32、float、double、bool等 @JsonProperty("data_type") private String dataType; //点位采集周期单位毫秒 private Integer cycle; }
父主题: 集成ModuleSDK进行OT数采