更新时间:2022-07-05 GMT+08:00
分享

代码解析

开发自定义驱动,进行OT数采。

public class DcDriver implements PointsCallback, ModuleShadowNotificationCallback {

    /**
     * 数采应用客户端,与边缘Hub建立MQTT连接
     */
    private DcClient dcClient;

    @PostConstruct
    void init() throws Exception {

        //打开客户端
        dcClient = DcClient.createFromEnv();
        dcClient.open();
        //设置回调,并同步模块影子
        dcClient.setPointsCallback(this);
        dcClient.startModuleShadow(this);
    }

    /**
     * 收到模块下行数采配置,消息需要缓存或持久化
     *进入边缘节点详情-》应用模块-》数采配置-》下发按钮
     */
    @Override
    public void onModuleShadowReceived(ModuleShadowNotification shadow) {
        BriefModuleShadowDTO briefModuleShadowDTO = JacksonUtil.json2Pojo(
            JacksonUtil.pojo2Json(shadow.getProperties()), BriefModuleShadowDTO.class);
        connectDatasource(briefModuleShadowDTO.getConnectionInfo());
        collectData(briefModuleShadowDTO.getPoints());
    }

    private void collectData(Map<String, Object> points) {

        Map<String, Object> returnValues = new HashMap<>();
        points.forEach((k, v) -> {
            PointConfig pointConfig = JacksonUtil.json2Pojo(JacksonUtil.pojo2Json(points), PointConfig.class);
            Object value = collectPointDataFromOpcuaServer(pointConfig);
            returnValues.put(pointConfig.getPointId(), value);
        });
        //数据采集后上报数据
        pointsReport(returnValues);
    }

    /**
     * 上报服务器采集点位数据到EdgeHub
     */
    private void pointsReport(Map<String, Object> points) {
        PointsReport report = new PointsReport();
        report.setPoints(points);
        try {
            dcClient.pointReport(report);
        } catch (JsonException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据根据opcua的点位信息从opcua服务器读取或者订阅数据
     */
    private Object collectPointDataFromOpcuaServer(PointConfig pointConfig) {
        //todo 伙伴根据address和周期读取点位数据
        //示例从服务器读取到点位数据为10
        Object returnValue = 10;
        return returnValue;
    }

    /**
     * 根据数采配置的数据源连接参数完成数据源连接
     */
    private void connectDatasource(Map<String, String> connectionInfo) {
        //opcua 服务器为示例,获取服务器连接地址
        String endpoint = connectionInfo.get("endpoint");
        //todo 伙伴根据endpoint实现连接数据源动作
    }

    /**
     * 收到点位设置的处理
     */
    @Override
    public PointsSetRsp onPointSet(String requestId, PointsSetReq pointsSetReq) {
        //PointsSetReq结构points为{pointId:value}的键值对,
        // 伙伴需要根据onModuleShadowReceived获取的数采配置实现写opcua服务器
        //正常写数据到opcua服务器响应示例
        return new PointsSetRsp(0,"success");
    }

    /**
     * 收到点位读取的处理
     */
    @Override
    public PointsGetRsp onPointGet(String requestId, PointsGetReq pointsGetReq) {
        //PointsSetReq结构points为[pointId1,pointId2]的列表,
        // 伙伴需要根据onModuleShadowReceived获取的数采配置实现读取opcua服务器的点位信息
        //正常从opcua服务器读取点位数据响应示例
        PointsGetRsp rsp = new PointsGetRsp();
        Map<String, Object> points = new HashMap<>();
        for (String point : pointsGetReq.getPoints()) {
            points.put(point, 1);
        }
        return rsp;
    }
}

下发配置对象

public class BriefModuleShadowDTO {

    //数据源id
    @JsonProperty("ds_id")
    private String dsId;

    //数采模板默认参数
    @JsonProperty("default_values")
    private Map<String, String> defaultValues;

    //数据源附加参数
    @JsonProperty("collection_paras")
    private Map<String, Integer> collectionParas;

    //数据源连接信息
    @JsonProperty("connection_info")
    private Map<String, String> connectionInfo;

    //点位信息
    private Map<String, Object> points;
}

点位信息对象

public class PointConfig {

    //点位id
    @JsonProperty("point_id")
    private String pointId;

    //点位地址,opcua地址:address = "ns=3;i=1002"
    private String address;

    //数据类型int、int32、float、double、bool等
    @JsonProperty("data_type")
    private String dataType;

    //点位采集周期单位毫秒
    private Integer cycle;
}

相关文档