更新时间:2022-06-27 GMT+08:00
分享

编排作业

操作场景

按照业务诉求,以算子编排的方式来开发实时作业。

  • 实时作业是由“算子”和“数据流”组成一个DAG(有向无环图)。
  • “算子”表示的是对数据的处理,本服务预制了多个算子供您使用,一个实时作业最多支持100个算子(即作业流图上显示的100个处理节点)。
  • “数据流”表示在算子之间的传递数据,数据有明确的字段和格式,在编排界面,数据流以算子之间的连线表示。
  • 算子分成3类,输入算子、转换算子和输出算子。一个完整的作业必须包含输入算子、转换算子和输出算子,否则不能正常执行。
    • 输入算子:负责实时作业数据的输入,是作业的属性类型。
    • 转换算子:负责对数据进行各种处理。
    • 输出算子:负责把作业处理的结果输出到作业外部。

编排作业

图1 作业编排页面
表1 页面区域说明

序号

区域

描述

1

算子货架窗口

算子列表,提供多种输入算子、转换算子、输出算子,以及高级算子。

2

画布窗口

在画布上通过算子和线来编排实时作业。

3

算子参数配置窗口

点击画布上某个算子后,显示此算子的参数。

表2 算子说明

算子类型

算子名称

描述

输入算子

管道数据输入

使用数据管道清洗过的数据作为实时分析的数据源。

算子配置项如图所示:

  • 算子名称:用户指定这个算子的名称。
  • 数据管道:用户选择要从哪个数据管道中获取数据。

    注意:此处的管道列表只会列出含有“实时分析输出”算子的管道。

  • 管道输出到RTA的数据名称:用户选择要使用哪个"管道输出数据名称"(“管道输出数据名称”在数据管道作业的“实时分析输出算子”中声明)。
  • 属性:用户选择需要使用哪些属性进行后续的分析任务。

资产数据输入

接收来自于资产模型的数据,以便进一步使用实时分析算子对资产数据进行分析,并将分析后的结果返回给资产模型,丰富资产模型内容。

注意:使用该算子后,数据输出算子只可选择“资产数据输出”。

算子配置项如图所示:

  • 算子名称:用户指定这个算子的名称。
  • 参数名称:指定这个“资产数据”在实时分析作业中使用时的参数名称,在资产建模的分析任务(资产建模->模型->分析任务)中可绑定输入参数与属性,如图所示:

  • 参数类型:指定这个参数的数据类型,包括STRING/INTEGER/DOUBLE/OBJECT四种类型。

注意:每个“资产数据输入”算子只可指定一个参数,若原作业中使用了多个参数,则需要使用多个“资产数据输入”算子。

转换算子

数据过滤

实现了根据条件进行数据过滤。支持多个条件过滤数据,条件间是“与”或“或”的关系。当需要按单条数据本身的字段取值来决定是否过滤数据时使用本算子。

数据扁平

把数据流中的嵌套Json字段转换为多个独立字段。如果测点类型是Object,那么是一个嵌套结构,需要把结构中的每个字段提取出来,成为数据流中的独立字段,这样数据流的所有字段都是字符串、数字等简单数据类型。

数据嵌套

把数据流中的字段打包成嵌套的Json格式字段,在实时作业输出数据时,如果需要输出嵌套的Json格式,那么使用此算子。

数据聚合

对多条数据进行聚合计算。支持算法为,求和(SUM),求平均(AVERAGE),求最大值(MAX),求最小值(MIN),求最旧值(FIRST),求最新值(LAST),求个数(COUNT),组装数组(ARRAY)。

须知:

求和(SUM),求平均(AVERAGE),求最大值(MAX),求最小值(MIN)算法支持STRING、INTEGER、LONG、FLOAT、DOUBLE数据类型,不支持其他数据类型。求最旧值(FIRST),求最新值(LAST),求个数(COUNT)支持所有数据类型。配置如果类型不匹配,输出此字段为null。

聚合使用数据窗口,支持窗口类型为:

滚动窗口:窗口数据无重叠,需要指定窗口大小。比如每5分钟统计数据条数,那么每5分钟一个窗口,不会有重复统计。

滑动窗口:窗口有数据重叠,需要指定窗口大小,以及窗口移动大小。比如每1分钟统计最近5分钟数据条数,那么1分钟一个窗口,窗口大小为5分钟,重叠了4分钟数据。

会话窗口:窗口无固定大小,以多长时间没有没有数据作为窗口间隔。

数据计算

需要生成新的字段或者改变字段的取值时使用。通过表达式配置(表达式配置与”数据管道”相同,详见 表达式说明)新字段的取值计算方式

须知:

对于新增加字段,在本算子的输出字段配置项中需要手工添加此字段,并且字段类型要和实际表达式计算结果的类型一致,如果不一致,算子会按照输出算子配置强制转换,如果转换失败,输出此字段为null。

数据状态计算

支持以最新数据取值来进行条件计算。数据可能来自不同时间点,比如温度测点上报了温度,烟感测点上报了烟状态,因为这2个测点是不同数据中字段,并且不是同一时间点,使用其他算子不能进行组合判断,而本算子可以把数据缓存起来,在任意数据有变更时触发条件检测。

须知:
  • 缓存的字段必须是本算子中条件中右值的字段。
  • 仅缓存的字段的最新取值。
  • 需要指定缓存的索引字段,算子根据索引来查找对应缓存,索引字段支持多个数据字段组合。

数据去重

为了过滤掉重复数据,使用本算子。支持指定字段是否相同来判断是否是重复数据,比如指定温度字段没有变化则是重复数据,而事件时间字段每条数据取值不同。

支持指定去重超时时长,在超时后,收到的第一条数据不按去重处理,然后重新开始去重计时以及去重检查。用于避免长时间数据字段值没有变化导致数据都被丢弃而不能产生数据。

数据选择

对数据的字段进行过滤,选择的字段名字才输出,过滤掉不在字段列表中的字段。

输出算子

资产数据输出

将实时分析的计算结果输出到资产模型,用于在资产模型中使用实时分析作业进行分析任务计算的结果。

注意:使用该算子后,数据输入算子只可选择“资产数据输入”。

算子配置项如图所示:

  • 算子名称:用户指定这个算子的名称。
  • 参数名称:指定分析结果在实时分析任务中输出时的名称,在资产建模的分析任务(资产建模->模型->分析任务)中可绑定输出参数与属性,如图所示:

  • 参数类型:指定这个参数的数据类型,包括STRING/INTEGER/DOUBLE/OBJECT四种类型。

DIS输出

作业处理后的数据结果输出到您的DIS云服务。

说明:

请先进行DIS云服务授权配置,相关操作请参考数据源--添加DIS数据源

高级算子

去噪

去噪算子,对数据流中的数据根据窗口周期进行去噪。

主要算子配置项包括:

  • 分区字段:指定用于窗口中数据分区的字段列表。当选择多个字段时,按指定字段顺序的组合成字符作为算子并行处理的分区的Key。如果是接收资产数据,那么不填写分区字段,则默认以资产作为分区。如果是接收管道数据,默认只有一个分区,请务必指定字段作为分权键。
  • 窗口类型:指定使用窗口的类型,TumblingTimeWindows是滚动时间窗口,SlidingTimeWindows是滑动时间窗口,SessionTimeWindows是会话窗口,TumblingCountWindows是滚动计数窗口,SlidingCountWindows是滑动计数窗口。
  • 时间类型:指定窗口时间如果为EventTime,则窗口时间按照消息中的时间,如果为ProcessingTime,则按系统处理时间。
  • 窗口大小:对于TumblingTimeWindows和SlidingTimeWindows类型窗口,单位为秒;对于SessionTimeWindows类型窗口,表示无消息的时间间隔,用于分割Session,单位为秒;对于TumblingCountWindows、SlidingCountWindows、TumblingCountTimeWindows和SlidingCountTimeWindows类型的窗口,单位为个。
  • 滑动大小:窗口滑动长度,对于SlidingTimeWindows类型窗口,单位为秒;对于SlidingCountWindows和SlidingCountTimeWindows类型的窗口,单位为个。
  • 时间偏移: 如果要求窗口时间与时区对齐,填写时区偏移,对于TumblingTimeWindows和SlidingTimeWindows类型窗口有效,单位为秒。
  • 去噪字段: 指定去噪字段进行计算。
  • 去噪算法: 指定去噪的算法类型,MaxCountOfValue/MinCountOfValue是某个取值出现的次数最多/最小则为正常数据。

算子如果配置正确,提示为:

算子如果配置错误,提示为:

相关文档