文档首页/ IoT数据分析 IoTA/ 最佳实践/ 创建管道作业接入IoTDA数据
更新时间:2022-12-01 GMT+08:00
分享

创建管道作业接入IoTDA数据

场景说明

用户通过构建管道作业实现对设备数据进行清洗,规整,去重,增加外部业务属性信息等一系列处理,为下一步基于资产模型分析设备属性数据做好准备,本案例中管道作业为计算设备OEE相关指标准备好基础数据。

创建步骤

  1. 创建管道作业,进入IoTA Console,选择左侧导航栏“数据管道”,然后单击右侧“创建数据管道”。

    输入作业名称,描述,选择数据源“IoTDA”类型,单击“确定”进入作业编辑页面。

    此处所选择数据源需要预先创建,即在前面已完成IoTDA服务授权配置。

  2. 进入管道作业编辑页面,系统默认为作业配置了“IoT设备接入数据源“,“产品过滤”,“数据存储输出” 3个算子,如下图所示。

  3. 添加存储组, 单击某一个存储输出算子,比如印刷机存储输出算子,右侧配置存储组名称中单击下方的“添加”,然后输入存储组名称“SMT-Demo-Storage”,单击右侧图标,保存配置。

  4. 添加存储,比如印刷机存储输出算子,在“数据存储名称”中单击“添加”,输入存储名称“Printing1”单击右侧图标,保存配置。

  5. 重复步骤3,步骤4,将其它5个设备输出算子配置好数据存储名称,配置完成单击右上角“保存”,保存成功后可在存储管理页面中查看已创建好的存储组和存储名称。

    一个存储组下面可以有多个存储名称,同一个存储组下面的所有存储数据老化周期相同。

    步骤3中的存储组名建议用同一个名称,即所以设备放在同一个存储组下面。

    每种设备有自己独立的存储名称,比如印刷机的存储名称为”Printing1“,贴片机的存储名称为”Mounter1“,因此一个SMT产线设备需创建6个存储名称。

  6. 为打印机添加“数据计算算子”,右键单击算子图标,弹出菜单中选择“数据计算”算子,如下图所示:

    算子配置如下图所示

    配置数据计算算子

    算子名称:转换时间格式

    变量名称:event_time_s

    取值:body.services[0].event_time

    属性名称:event_time

    表达式:TimeFunction.stringToLong(event_time_s)

    背景说明:

    一般情况用于计划生产的工厂日历,生产排班信息来自己于外部MES系统,为了减少本实践的复杂度,采用直接通过设备消息中带的时间戳信息,并结合表达式中配置的班次时间点信息来判断当前时间点是哪个班次,是否在计划生产时间 范围内。

    此算子的作用: 转换时间格式,将String类型的时间格式转换为Long类型的时间戳,为方便下游算子判断当前时间点是否为计划正常生产时间(生产班次)范围内,即为下游算子时间范围判断做准备。

    body.services[0].event_time:根据消息体中的JsonPath路径获取,要符合JsonPath的语法描述。

    event_time:此算子中增加的中间临时字段,不会作为最后输出。

    TimeFunction.stringToLong()为系统内置时间转换函数,在表达式中可以直接使用。

  7. 为打印机再添加“数据计算算子”,右键单击算子图标,弹出菜单中选择“数据计算”算子,如下图所示

    算子的配置如下图所示:

    配置数据计算算子

    算子名称:增加output和quality属性

    变量:

    变量名称:event_time

    取值:event_time

    变量名称:quality_result

    取值:body.services[?(@.service_id=='profile2')].properties.result

    计算配置:

    属性名称:body.services[?(@.service_id=='profile2')].properties.is_plan_work_period

    表达式:TimeFunction.shiftCheck(event_time, "07:00:00", "12:00:00") || TimeFunction.shiftCheck(event_time, "13:30:00", "18:00:00") || TimeFunction.shiftCheck(event_time, "19:00:00", "23:59:00")

    属性名称:body.services[?(@.service_id=='profile2')].properties.quality

    表达式:quality_result>=1 && quality_result <= 9 ?1:0

    属性名称:body.services[?(@.service_id=='profile2')].properties.output

    表达式:1

    背景说明:

    因为设备本身没有上报产量的属性,当前实践中采用设备上报一条消息即认为已正常生产了一个产品的方案,因此需要在管道处理时在消息中增加output属性。

    部分设备本身会上报result字段,比如AOI,SPI会上报类似产品质量结果字段,为了更接近于真实情况(模拟部分产品为次品的场景),增加了一个判断逻辑,上报的result属性值在1~9内表示产品质量合格,其它为不合格,因此在消息中增加了代表产品判断质量的quality属性。

    此算子的作用:增加output和quality属性,即在原始消息体中增加产量output和产品质量quality属性。

    计算配置中的参数配置说明如下:

    TimeFunction.shiftCheck()为系统内置时间函数,用于判断输入的时间戳是否在指定的时间段内,用于下游算子判断,并且将判断结果写到新增属性“is_plan_work_period”中。

    此算子模拟早,中,晚3个班次,时间段分别是"07:00:00"~"12:00:00","13:30:00"~ "18:00:00" ,"19:00:00"~ "23:59:00"。

    quality_result>=1 && quality_result <= 9 ?1:0 此表达式用于判断产品质量是否合格,quality_result在1~9内表示产品质量合格,其它为不合格,并且将判断结果写到新增属性“quality”中。

    body.services[?(@.service_id=='profile2')].properties.output 表示新增加产量“output ”属性,其值固定为1,表示每上报一次消息代表生产了一个产品。

  8. 为打印机再添加“数据过滤”算子,右键单击算子图标,弹出菜单中选择“数据计算”算子,如下图所示

    算子名称:过滤掉非工作时间段的数据

    条件关系: AND

    过滤条件

    属性:body.services[?(@.service_id=='profile2')].properties.is_plan_work_period

    计算符号:EQUAL

    数值:true

    此算子的作用: 过滤掉非工作时间段的数据,即如果设备上报消息中包括非计划开机的时间段的消息,将非计划开机时间段的所有消息抛弃掉。

    过滤条件说明如下:

    判断上游算子新加的属性:is_plan_work_period 是否在设备计划开机时间段内,如果为true则消息会输出到下游继续处理。

  9. 重复步骤6~步骤8将其它设备分支添同样的算子进行处理,配置后的效果如下图所示

  10. 为印刷机增加计划工作状态属性, 右键单击IoTDA数据源算子弹出“添加产品过滤”算子图标,

    产品过滤算子的参数可参考上面的印刷机分支进行相同配置。

  11. 增加超时插补算子,右键单击增加“超时插补”算子,算子配置如下:

    此算子的作用: 用于在设备消息中补充计划开机或停机的状态属性。(计划开停机状态数据一般可以从外部业务系统获取,此Demo中为了模拟方便,此数据通过插补算子定时在消息中插入状态属性信息。)

    超时插补算子说明如下:

    分区键:header.device_id 即使用消息头中的设备ID字段,用于给每个设备插补设备的计划工作状态。

    插补时间类型:ABOLUTELY, 绝对类型,即每个周期都会固定插入设备的计划工作状态属性。

    插补周期: * * * * * ? Cron表达式,表示每秒钟都插补设备的计划工作状态。

    等待时长 0 表示不等待

  12. 为印刷机增加“数据计算”算子,用于转换时间格式,配置方法与步骤6相同,不再赘述。
  13. 为印刷机增加“数据计算”算子,用于增加增加PlanningWorkStatus属性,算子参数配置如下

    配置数据计算算子

    算子名称:增加PlanningWorkStatus属性

    变量:

    变量名称:event_time

    取值:event_time

    计算配置:

    属性名称:PlanningWorkStatus

    表达式:TimeFunction.shiftCheck(event_time, "07:00:00", "12:00:00") || TimeFunction.shiftCheck(event_time, "13:30:00", "18:00:00") || TimeFunction.shiftCheck(event_time, "19:00:00", "23:59:00")

    此算子的作用: 增加PlanningWorkStatus属性,即在原始消息体中增加PlanningWorkStatus 设备计划工作状态属性。

    计算配置中的参数配置说明如下:

    TimeFunction.shiftCheck()为系统内置时间函数,用于判断输入的时间戳是否在指定的时间段内,用于后续在资产中针对设备OEE计算使用,并且将判断结果写到新增属性“PlanningWorkStatus ”中。

    此算子模拟早,中,晚3个班次,时间段分别是"07:00:00"~"12:00:00","13:30:00"~ "18:00:00" ,"19:00:00"~ "23:59:00",其它为非工作时间段。

  14. 为印刷机增加数据输出算子,用于将设备计划工作状态数据输出到存储中,算子配置如下图所示

    此算子的作用:为印刷机增加数据输出算子,用于将设备计划工作状态 PlanningWorkStatus 输出到存储中。

    注意:

    存储组名称和存储名称与上面的印刷机分支保持一致。

    设备ID与上面的印刷机分支保持一致。

    输出属性名称:PlanningWorkStatus

    属性类型:String

    源属性:PlanningWorkStatus

  15. 重复步骤10~步骤14为其它5种SMT设备分别增加设备计划工作状态属性数据,最终完成后如下图效果:

  16. 单击右上角保存,并启动作业。

    启动成功后,约1分钟左右,作业状态为“运行中”表示作业正常运行。

相关文档