更新时间:2024-11-27 GMT+08:00
分享

配置数据转发规则

概述

规则引擎可以订阅设备Topic,获取设备上报的数据,然后将解析过的数据发送到其他云服务供其使用。例如,用户可制定规则,命令设备每小时上报一次设备温度,如果设备温度超出正常范围,则关闭该设备,给用户发送告警信息;LINK将收集到的数据传输到大数据分析平台,评估其他设备发生故障的风险。

前提条件

  • 每个数据转发规则要归属到某个集成应用下,在创建规则前您需要有可用的集成应用,否则请提前创建集成应用
  • 使用规则引擎转发DIS时,用户需要具备DIS Administrator角色权限。

创建规则

  1. 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
  2. 在左侧的导航栏选择“设备集成 LINK > 规则引擎”,单击页面右上角的“创建规则”。
  3. 在创建规则弹窗中配置规则相关信息,完成后单击“确认”。
    表1 规则信息配置

    参数

    配置说明

    规则名称

    填写规则的名称,根据规划自定义。建议您按照一定的命名规则填写规则名称,方便您快速识别和查找。

    集成应用

    选择规则所属的集成应用名称。

    规则描述

    填写规则的描述信息。

    状态

    选择是否启用规则,默认启用。只有启用后,规则才生效。

  4. 规则创建完成后,在规则列表中单击已创建规则的名称,进入规则详情页面。
  5. 配置规则的数据源端。
    1. 单击“数据源端”下的“创建数据源端”,增加一条数据源端配置。
    2. 配置数据源端相关信息,完成后单击“保存”。
      表2 数据源端配置

      参数

      配置说明

      产品名称

      选择设备所属的产品。

      设备名称

      选择要转发数据的设备,可以选择指定设备或全部设备。

      Topic名称

      选择设备上报消息使用的Topic。

      Topic级别

      选择Topic的级别,根据“设备名称”的选择自动适配,“设备名称”不做选择,默认为“产品级别”;“设备名称”选择具体设备则默认匹配为“设备级别”。

      base64编码

      是否对转发的设备数据进行base64编码。

      包含设备信息

      转发的设备数据是否包含设备信息。

  6. (可选)配置对转发数据的数据解析,对转发数据进行过滤筛选。
    SQL解析配置说明请参见SQL解析

    数据解析在应用之后,base64和包含设备信息选项不生效。

  7. 配置规则的数据目的端。

    Site实例中,数据源目的端仅支持选择ROMA MQS和设备的订阅Topic。

    1. 单击“数据目的端”下的“创建数据目的端”,增加一条数据目的端配置。
    2. 配置数据目的端相关信息,完成后单击“保存”。
      表3 数据目的端配置

      数据源目的端

      参数

      配置说明

      ROMA MQS

      连接地址

      选择ROMA MQS的连接地址。

      Topic名称

      选择数据要转发到的Topic名称。

      用户名

      仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。

      填写“Topic名称”中Topic所属集成应用的Key。

      密码

      仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。

      填写“Topic名称”中Topic所属集成应用的Secret。

      分布式消息服务Kafka

      连接地址

      选择分布式消息服务 Kafka的连接地址。

      Topic名称

      选择数据要转发到的Topic名称。

      用户名

      仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。

      填写“Topic名称”中Topic所属集成应用的Key。

      密码

      仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。

      填写“Topic名称”中Topic所属集成应用的Secret。

      数据接入服务 DIS

      通道列表

      选择数据要转发到的DIS通道。DIS通道是租户创建的逻辑单位,用以区分不同租户实时数据的集合,用户使用DIS发送或接收数据时,需要指定通道名称。

      委托服务

      选择一个委托任务。 委托服务是指用户在IAM处创建委托,授予ROMA Connect访问DIS的权限。

      设备的订阅Topic

      产品

      选择设备所属的产品。

      设备

      选择数据要转发到的设备。

      Topic

      选择数据要转发到的设备Topic。

SQL解析

概念:

设备接入ROMA Connect以后,会把数据封装成JSON格式的消息,发送给ROMA Connect。JSON中包含了Key值和Value值,为了方便理解,可以把规则看作一条SQL语句;把JSON看作一张表, Key值的是这张表的列,Value值是表的列值。通过SQL语句过滤设备消息,并将其发送给其他服务。

例如,有一个温度传感器,用于控制设备温度。它可以采集设备种类、环境温度、环境湿度、当前时间,上报的格式和内容如样例所示:

{
"device":"camera",
"temperature":30,
"humidity":65,
"time":"xxx,xxx"
}

如果您想制定一条规则,当温度高于20摄氏度或低于15摄氏度时,发送告警消息,那么您可以输入以下SQL语句。执行这条语句,满足上述条件时,LINK会上报设备种类、设备温度的绝对值、设备湿度和时间,用于进一步处理。

SELECT
        device,abs(temperature),
        humidity               ,
        TIME
FROM
        mcxeSR187154/OUT/test
WHERE
        temperature > 20
OR      temperature < 15

FROM语句中的“mcxeSR187154/out/test”,说明规则引擎只接受来自设备名叫做test的设备。您的设备可能与样例中的不同,请按照实际情况修改设备信息。

当上报的数据中,温度大于20摄氏度或者小于15摄氏度时,会触发该规则,并且解析数据中的温度、设备名称、位置,用于进一步处理。结果如图1所示。

图1 返回结果

使用方式:

SQL语句由三个部分组成:Select语句、From语句和Where语句。JSON格式的数据分为两种:带有单引号或双引号的是常量数据,不带任何引号的是变量数据。

SELECT语句中的字段是JSON消息Key里面的值,支持SQL内置的函数。您可以参考表5,获取其他SQL函数的使用方法。同时,SELECT语句不仅支持“*”和函数的组合,也支持数组和嵌套取值的JSON。例如,{"a":{"temperature":29, "color":"red"}},可以通过a.color获取到值"color":"red"。使用变量时,需要注意不带引号的字段是变量,带单引号和双引号的字段是常量。

由于温度可以是正数、0或负数,为了方便管理,上文例子中的“abs(temperature)”使用了“abs()”函数,输出温度的绝对值。

FROM语句中包含的是设备名称。您可以指定单一设备或某一个产品下的所有设备,进行消息上报。“产品名称_out_设备名称”表示指定单一设备,执行之后,该规则只对这一个设备有约束力;“产品名称/out/+”,“+”符号表示本级所有类目,可以匹配到该产品下所有的设备,因此该名称可以指定某一产品下的所有设备,执行之后,该规则对该产品下的所有设备都有约束力。

WHERE包含了条件表达式,负责筛选符合条件的字段和消息。例如上述例子中,“WHERE temperature > 20 or temperature < 15”是筛选条件,只有温度大于20摄氏度或者小于15摄氏度时,消息才会被过滤出来。WHERE语句支持的具体条件表达式请见表4

函数执行结果支持使用as指定别名,例如使用SELECT upper(Datas.name) as name FROM TOPIC WHERE 规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"name":"OPCUA_DATA"}。别名支持生成嵌套的JSON,例如将as name改为as Datas.name,使用SELECT upper(Datas.name) as Datas.name FROM TOPIC WHERE规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"Datas":{"name":"OPCUA_DATA"}}。

表4 条件表达式

操作

=

相等

color = 'red'

<>

不等于

color <> 'red'

and

逻辑与

color = 'red' and switch = 'on'

or

逻辑或

color = 'red' or switch = 'on'

( )

括号代表一个整体

a>1 and (b<1 or b>5),此时后面是一个整体,先执行逻辑或的判定,再执行逻辑和的判定。

in

仅支持枚举,不支持子查询

where a in(1,2,3),不支持: where a in(select xxx)

+

算术加法

a in (3,2,3+8)

-

算术减

13 – 2

/

25 / 5

*

2 * 8

%

取余数

10 % 2

<

小于

1 < 3

<=

小于或等于

1 <= 3

>

大于

8 > 3

>=

大于或等于

8 >= 3

CASE … WHEN … THEN … ELSE …END

Case 表达式(不支持嵌套)

case a when 3 then 'hello' when 4 then 'bye' end FROM item WHERE a >= b+c"

规则引擎还提供多种函数,您可以在编写SQL时使用它们,从而实现多样化数据处理。您可以在SQL语句中,使用函数获取数据或者对数据做处理。例如,SELECT service,abs(temperature),用到了abs(number)函数,具体条件表达式请见表5

表5 SQL函数列表

函数名

函数说明

abs(number)

返回绝对值。

sin(n)

返回n值的正弦。

cos(number)

返回number值的余弦。

asin(number)

返回number值的反正弦。

sinh(n)

返回n值的双曲正弦(hyperbolic sine)。

cosh(number)

返回number值的双曲余弦(hyperbolic cosine)。

tan(n)

返回n值的正切。

tanh(n)

返回n值的双曲正切(hyperbolic tangent)。

lower(string)

返回小写字符串。

upper(string)

返回大写字符。

power(n,m)

返回n的m次幂。

rand()

返回[0~1)之间随机数。

mod(n, m)

n%m余数。

log(n, m)

返回自然对数。如果不传m值,则返回log(n)。

exp(number)

返回指定数字的指定次幂。

floor(number)

返回一个最接近它的整数,它的值小于或等于这个浮点数。

concat(string1, string2)

字符串连接。示例:concat(field,a),输出“fielda”。

replace(source, substring, replacement)

对某个目标列值进行替换。示例:replace(field,'iel','oo'),输出“food”。

topic()

返回整个topic信息。例如,Topic: /abcdef/ghi。使用函数topic(),返回“ /abcdef/ghi”。

endswith(input, suffix)

判断input值是否以suffix结尾。

timestamp(format)

不带参数返回默认时间戳,带参数返回指定格式的时间戳,例如,timestamp() = 1553572557420

timestampUtc()

获取系统Utc时间。如果不带参数,返回当前系统时间毫秒数,如果带1个参数,那么这个参数作为时间格式化的格式参数。

serviceId()

返回消息对应的serviceId,不支持填参数。

clientId()

获取当前topic的clientId, 不支持填参数。

arrayFilter(arrayName, compareName, compareExpr, compareValue)

通过指定的运算符(=,like,>,<,>=,<=,<>)过滤指定数组中符合条件的内容,返回过滤后数组。该函数支持在字段中使用,不支持在条件中使用。

不支持对数组嵌套数组的内层json内容进行筛选或处理。

arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。

compareName:数组中所包含的JSON对象的属性名称,根据该属性的值进行过滤,该参数需为字符串类型,例如:'BrowseName'。

compareExpr:运算符,需为字符串类型,支持'=','like','>','<','>=','<=','<>',其中'<>'代表'不等于'。

compareValue:过滤值,支持数字和字符串类型。其中运算符为like时,过滤值支持以%开头和结尾进行模糊匹配,例如'%ab',匹配以ab结尾的字符串,'ab%'匹配以ab开头的字符串,'%ab%',匹配包含ab的字符串。

arrayFilter()和arrayOperation()可作为arrayName参数在这两个方法中嵌套使用。例如 对以下json进行解析:

{
	"Datas": {
		"name": "opcua_data",
		"Nodes": [{
				"BrowseName": "test1",
				"CollectTime": "2022-12-12 10:10:10"
			},
			{
				"BrowseName": "test2",
				"CollectTime": "2022-12-12 10:10:10"
			}
		]
	}
}

解析指令 :

arrayOperation(arrayFilter(Datas.Nodes,'BrowseName','=',"test1"),'BrowseName','upper') as Nodes

解析结果:

{
	"Nodes": [{
		"BrowseName": "TEST1",
		"CollectTime": "2022-12-12 10:10:10"
	}]
}

arrayOperation(arrayName, operationElement, operationFunctionName, operationFunctionParams)

遍历数组,结合运算符和参数进行运算,返回运算后数组。该函数支持在字段中使用,不支持在条件中使用。

不支持对数组嵌套数组的内层json内容进行筛选或处理。

arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。

operationElement:数组中所包含的需要做运算的JSON对象的属性名称,该参数需为字符串类型,例如:'BrowseName'。

operationFunctionName:运算符,需为字符串类型,支持基本运算符:'+','-','*','/','%',数字类型函数:'abs'(绝对值),'mod'(取余),'floor'(向下取整),字符串类型函数:'lower'(大写转小写),'upper'(小写转大写),'concat'(在元素后拼接字符串),'replace'(替换字符串)。

operationFunctionParams:双目运算符需要两个参数,单目运算符需要一个参数,根据不同的运算符,需要不同的参数。例如:'+' 运算符需要一个数字类型参数,'mod'运算符需要一个数字类型参数,'lower' 运算符不需要额外参数,'concat' 运算符需要一个字符串或数字类型参数,'replace' 需要两个字符串或数字类型参数。

相关文档