配置数据转发规则
概述
规则引擎可以订阅设备Topic,获取设备上报的数据,然后将解析过的数据发送到其他云服务供其使用。例如,用户可制定规则,命令设备每小时上报一次设备温度,如果设备温度超出正常范围,则关闭该设备,给用户发送告警信息;LINK将收集到的数据传输到大数据分析平台,评估其他设备发生故障的风险。
前提条件
- 每个数据转发规则要归属到某个集成应用下,在创建规则前您需要有可用的集成应用,否则请提前创建集成应用。
- 使用规则引擎转发DIS时,用户需要具备DIS Administrator角色权限。
创建规则
- 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
- 在左侧的导航栏选择“设备集成 LINK > 规则引擎”,单击页面右上角的“创建规则”。
- 在创建规则弹窗中配置规则相关信息,完成后单击“确认”。
表1 规则信息配置 参数
配置说明
规则名称
填写规则的名称,根据规划自定义。建议您按照一定的命名规则填写规则名称,方便您快速识别和查找。
集成应用
选择规则所属的集成应用名称。
规则描述
填写规则的描述信息。
状态
选择是否启用规则,默认启用。只有启用后,规则才生效。
- 规则创建完成后,在规则列表中单击已创建规则的名称,进入规则详情页面。
- 配置规则的数据源端。
- 单击“数据源端”下的“创建数据源端”,增加一条数据源端配置。
- 配置数据源端相关信息,完成后单击“保存”。
表2 数据源端配置 参数
配置说明
产品名称
选择设备所属的产品。
设备名称
选择要转发数据的设备,可以选择指定设备或全部设备。
Topic名称
选择设备上报消息使用的Topic。
Topic级别
选择Topic的级别,根据“设备名称”的选择自动适配,“设备名称”不做选择,默认为“产品级别”;“设备名称”选择具体设备则默认匹配为“设备级别”。
base64编码
是否对转发的设备数据进行base64编码。
包含设备信息
转发的设备数据是否包含设备信息。
- (可选)配置对转发数据的数据解析,对转发数据进行过滤筛选。
- 配置规则的数据目的端。
Site实例中,数据源目的端仅支持选择ROMA MQS和设备的订阅Topic。
- 单击“数据目的端”下的“创建数据目的端”,增加一条数据目的端配置。
- 配置数据目的端相关信息,完成后单击“保存”。
表3 数据目的端配置 数据源目的端
参数
配置说明
ROMA MQS
连接地址
选择ROMA MQS的连接地址。
Topic名称
选择数据要转发到的Topic名称。
用户名
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Key。
密码
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Secret。
分布式消息服务Kafka
连接地址
选择分布式消息服务 Kafka的连接地址。
Topic名称
选择数据要转发到的Topic名称。
用户名
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Key。
密码
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Secret。
数据接入服务 DIS
通道列表
选择数据要转发到的DIS通道。DIS通道是租户创建的逻辑单位,用以区分不同租户实时数据的集合,用户使用DIS发送或接收数据时,需要指定通道名称。
委托服务
选择一个委托任务。 委托服务是指用户在IAM处创建委托,授予ROMA Connect访问DIS的权限。
设备的订阅Topic
产品
选择设备所属的产品。
设备
选择数据要转发到的设备。
Topic
选择数据要转发到的设备Topic。
SQL解析
概念:
设备接入ROMA Connect以后,会把数据封装成JSON格式的消息,发送给ROMA Connect。JSON中包含了Key值和Value值,为了方便理解,可以把规则看作一条SQL语句;把JSON看作一张表, Key值的是这张表的列,Value值是表的列值。通过SQL语句过滤设备消息,并将其发送给其他服务。
例如,有一个温度传感器,用于控制设备温度。它可以采集设备种类、环境温度、环境湿度、当前时间,上报的格式和内容如样例所示:
{ "device":"camera", "temperature":30, "humidity":65, "time":"xxx,xxx" }
如果您想制定一条规则,当温度高于20摄氏度或低于15摄氏度时,发送告警消息,那么您可以输入以下SQL语句。执行这条语句,满足上述条件时,LINK会上报设备种类、设备温度的绝对值、设备湿度和时间,用于进一步处理。
SELECT device,abs(temperature), humidity , TIME FROM mcxeSR187154/OUT/test WHERE temperature > 20 OR temperature < 15
FROM语句中的“mcxeSR187154/out/test”,说明规则引擎只接受来自设备名叫做test的设备。您的设备可能与样例中的不同,请按照实际情况修改设备信息。
当上报的数据中,温度大于20摄氏度或者小于15摄氏度时,会触发该规则,并且解析数据中的温度、设备名称、位置,用于进一步处理。结果如图1所示。
使用方式:
SQL语句由三个部分组成:Select语句、From语句和Where语句。JSON格式的数据分为两种:带有单引号或双引号的是常量数据,不带任何引号的是变量数据。
SELECT语句中的字段是JSON消息Key里面的值,支持SQL内置的函数。您可以参考表5,获取其他SQL函数的使用方法。同时,SELECT语句不仅支持“*”和函数的组合,也支持数组和嵌套取值的JSON。例如,{"a":{"temperature":29, "color":"red"}},可以通过a.color获取到值"color":"red"。使用变量时,需要注意不带引号的字段是变量,带单引号和双引号的字段是常量。
由于温度可以是正数、0或负数,为了方便管理,上文例子中的“abs(temperature)”使用了“abs()”函数,输出温度的绝对值。
FROM语句中包含的是设备名称。您可以指定单一设备或某一个产品下的所有设备,进行消息上报。“产品名称_out_设备名称”表示指定单一设备,执行之后,该规则只对这一个设备有约束力;“产品名称/out/+”,“+”符号表示本级所有类目,可以匹配到该产品下所有的设备,因此该名称可以指定某一产品下的所有设备,执行之后,该规则对该产品下的所有设备都有约束力。
WHERE包含了条件表达式,负责筛选符合条件的字段和消息。例如上述例子中,“WHERE temperature > 20 or temperature < 15”是筛选条件,只有温度大于20摄氏度或者小于15摄氏度时,消息才会被过滤出来。WHERE语句支持的具体条件表达式请见表4。
函数执行结果支持使用as指定别名,例如使用SELECT upper(Datas.name) as name FROM TOPIC WHERE 规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"name":"OPCUA_DATA"}。别名支持生成嵌套的JSON,例如将as name改为as Datas.name,使用SELECT upper(Datas.name) as Datas.name FROM TOPIC WHERE规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"Datas":{"name":"OPCUA_DATA"}}。
操作符 |
描述 |
举例 |
---|---|---|
= |
相等 |
color = 'red' |
<> |
不等于 |
color <> 'red' |
and |
逻辑与 |
color = 'red' and switch = 'on' |
or |
逻辑或 |
color = 'red' or switch = 'on' |
( ) |
括号代表一个整体 |
a>1 and (b<1 or b>5),此时后面是一个整体,先执行逻辑或的判定,再执行逻辑和的判定。 |
in |
仅支持枚举,不支持子查询 |
where a in(1,2,3),不支持: where a in(select xxx) |
+ |
算术加法 |
a in (3,2,3+8) |
- |
算术减 |
13 – 2 |
/ |
除 |
25 / 5 |
* |
乘 |
2 * 8 |
% |
取余数 |
10 % 2 |
< |
小于 |
1 < 3 |
<= |
小于或等于 |
1 <= 3 |
> |
大于 |
8 > 3 |
>= |
大于或等于 |
8 >= 3 |
CASE … WHEN … THEN … ELSE …END |
Case 表达式(不支持嵌套) |
case a when 3 then 'hello' when 4 then 'bye' end FROM item WHERE a >= b+c" |
规则引擎还提供多种函数,您可以在编写SQL时使用它们,从而实现多样化数据处理。您可以在SQL语句中,使用函数获取数据或者对数据做处理。例如,SELECT service,abs(temperature),用到了abs(number)函数,具体条件表达式请见表5。
函数名 |
函数说明 |
---|---|
abs(number) |
返回绝对值。 |
sin(n) |
返回n值的正弦。 |
cos(number) |
返回number值的余弦。 |
asin(number) |
返回number值的反正弦。 |
sinh(n) |
返回n值的双曲正弦(hyperbolic sine)。 |
cosh(number) |
返回number值的双曲余弦(hyperbolic cosine)。 |
tan(n) |
返回n值的正切。 |
tanh(n) |
返回n值的双曲正切(hyperbolic tangent)。 |
lower(string) |
返回小写字符串。 |
upper(string) |
返回大写字符。 |
power(n,m) |
返回n的m次幂。 |
rand() |
返回[0~1)之间随机数。 |
mod(n, m) |
n%m余数。 |
log(n, m) |
返回自然对数。如果不传m值,则返回log(n)。 |
exp(number) |
返回指定数字的指定次幂。 |
floor(number) |
返回一个最接近它的整数,它的值小于或等于这个浮点数。 |
concat(string1, string2) |
字符串连接。示例:concat(field,a),输出“fielda”。 |
replace(source, substring, replacement) |
对某个目标列值进行替换。示例:replace(field,'iel','oo'),输出“food”。 |
topic() |
返回整个topic信息。例如,Topic: /abcdef/ghi。使用函数topic(),返回“ /abcdef/ghi”。 |
endswith(input, suffix) |
判断input值是否以suffix结尾。 |
timestamp(format) |
不带参数返回默认时间戳,带参数返回指定格式的时间戳,例如,timestamp() = 1553572557420 |
timestampUtc() |
获取系统Utc时间。如果不带参数,返回当前系统时间毫秒数,如果带1个参数,那么这个参数作为时间格式化的格式参数。 |
serviceId() |
返回消息对应的serviceId,不支持填参数。 |
clientId() |
获取当前topic的clientId, 不支持填参数。 |
arrayFilter(arrayName, compareName, compareExpr, compareValue) |
通过指定的运算符(=,like,>,<,>=,<=,<>)过滤指定数组中符合条件的内容,返回过滤后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 compareName:数组中所包含的JSON对象的属性名称,根据该属性的值进行过滤,该参数需为字符串类型,例如:'BrowseName'。 compareExpr:运算符,需为字符串类型,支持'=','like','>','<','>=','<=','<>',其中'<>'代表'不等于'。 compareValue:过滤值,支持数字和字符串类型。其中运算符为like时,过滤值支持以%开头和结尾进行模糊匹配,例如'%ab',匹配以ab结尾的字符串,'ab%'匹配以ab开头的字符串,'%ab%',匹配包含ab的字符串。 arrayFilter()和arrayOperation()可作为arrayName参数在这两个方法中嵌套使用。例如 对以下json进行解析: { "Datas": { "name": "opcua_data", "Nodes": [{ "BrowseName": "test1", "CollectTime": "2022-12-12 10:10:10" }, { "BrowseName": "test2", "CollectTime": "2022-12-12 10:10:10" } ] } } 解析指令 : arrayOperation(arrayFilter(Datas.Nodes,'BrowseName','=',"test1"),'BrowseName','upper') as Nodes 解析结果: { "Nodes": [{ "BrowseName": "TEST1", "CollectTime": "2022-12-12 10:10:10" }] } |
arrayOperation(arrayName, operationElement, operationFunctionName, operationFunctionParams) |
遍历数组,结合运算符和参数进行运算,返回运算后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 operationElement:数组中所包含的需要做运算的JSON对象的属性名称,该参数需为字符串类型,例如:'BrowseName'。 operationFunctionName:运算符,需为字符串类型,支持基本运算符:'+','-','*','/','%',数字类型函数:'abs'(绝对值),'mod'(取余),'floor'(向下取整),字符串类型函数:'lower'(大写转小写),'upper'(小写转大写),'concat'(在元素后拼接字符串),'replace'(替换字符串)。 operationFunctionParams:双目运算符需要两个参数,单目运算符需要一个参数,根据不同的运算符,需要不同的参数。例如:'+' 运算符需要一个数字类型参数,'mod'运算符需要一个数字类型参数,'lower' 运算符不需要额外参数,'concat' 运算符需要一个字符串或数字类型参数,'replace' 需要两个字符串或数字类型参数。 |