配置数据转发规则
概述
规则引擎可以订阅设备Topic,获取设备上报的数据,然后将解析过的数据发送到其他云服务供其使用。例如,用户可制定规则,命令设备每小时上报一次设备温度,如果设备温度超出正常范围,则关闭该设备,给用户发送告警信息;LINK将收集到的数据传输到大数据分析平台,评估其他设备发生故障的风险。
前提条件
- 每个数据转发规则要归属到某个集成应用下,在创建规则前您需要有可用的集成应用,否则请提前创建集成应用。
- 使用规则引擎转发DIS时,用户需要具备DIS Administrator角色权限。
创建规则
- 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
- 在左侧的导航栏选择“设备集成 LINK > 规则引擎”,单击页面右上角的“创建规则”。
- 在创建规则弹窗中配置规则相关信息,完成后单击“确认”。 
    表1 规则信息配置 参数 配置说明 规则名称 填写规则的名称,根据规划自定义。建议您按照一定的命名规则填写规则名称,方便您快速识别和查找。 集成应用 选择规则所属的集成应用名称。 规则描述 填写规则的描述信息。 状态 选择是否启用规则,默认启用。只有启用后,规则才生效。 
- 规则创建完成后,在规则列表中单击已创建规则的名称,进入规则详情页面。
- 配置规则的数据源端。
    - 单击“数据源端”下的“创建数据源端”,增加一条数据源端配置。
- 配置数据源端相关信息,完成后单击“保存”。 
      表2 数据源端配置 参数 配置说明 产品名称 选择设备所属的产品。 设备名称 选择要转发数据的设备,可以选择指定设备或全部设备。 Topic名称 选择设备上报消息使用的Topic。 Topic级别 选择Topic的级别,根据“设备名称”的选择自动适配,“设备名称”不做选择,默认为“产品级别”;“设备名称”选择具体设备则默认匹配为“设备级别”。 base64编码 是否对转发的设备数据进行base64编码。 包含设备信息 转发的设备数据是否包含设备信息。 
 
- (可选)配置对转发数据的数据解析,对转发数据进行过滤筛选。
- 配置规则的数据目的端。
      Site实例中,数据源目的端仅支持选择ROMA MQS和设备的订阅Topic。 - 单击“数据目的端”下的“创建数据目的端”,增加一条数据目的端配置。
- 配置数据目的端相关信息,完成后单击“保存”。 
      表3 数据目的端配置 数据源目的端 参数 配置说明 ROMA MQS 连接地址 选择ROMA MQS的连接地址。 Topic名称 选择数据要转发到的Topic名称。 用户名 仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。 填写“Topic名称”中Topic所属集成应用的Key。 密码 仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。 填写“Topic名称”中Topic所属集成应用的Secret。 分布式消息服务Kafka 连接地址 选择分布式消息服务 Kafka的连接地址。 Topic名称 选择数据要转发到的Topic名称。 用户名 仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。 填写“Topic名称”中Topic所属集成应用的Key。 密码 仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。 填写“Topic名称”中Topic所属集成应用的Secret。 数据接入服务 DIS 通道列表 选择数据要转发到的DIS通道。DIS通道是租户创建的逻辑单位,用以区分不同租户实时数据的集合,用户使用DIS发送或接收数据时,需要指定通道名称。 委托服务 选择一个委托任务。 委托服务是指用户在IAM处创建委托,授予ROMA Connect访问DIS的权限,委托策略选择“DIS User”。 设备的订阅Topic 产品 选择设备所属的产品。 设备 选择数据要转发到的设备。 Topic 选择数据要转发到的设备Topic。 
 
SQL解析
概念:
设备接入ROMA Connect以后,会把数据封装成JSON格式的消息,发送给ROMA Connect。JSON中包含了Key值和Value值,为了方便理解,可以把规则看作一条SQL语句;把JSON看作一张表, Key值的是这张表的列,Value值是表的列值。通过SQL语句过滤设备消息,并将其发送给其他服务。
例如,有一个温度传感器,用于控制设备温度。它可以采集设备种类、环境温度、环境湿度、当前时间,上报的格式和内容如样例所示:
{
"device":"camera",
"temperature":30,
"humidity":65,
"time":"xxx,xxx"
}
  如果您想制定一条规则,当温度高于20摄氏度或低于15摄氏度时,发送告警消息,那么您可以输入以下SQL语句。执行这条语句,满足上述条件时,LINK会上报设备种类、设备温度的绝对值、设备湿度和时间,用于进一步处理。
SELECT
        device,abs(temperature),
        humidity               ,
        TIME
FROM
        mcxeSR187154/OUT/test
WHERE
        temperature > 20
OR      temperature < 15
   
 
   FROM语句中的“mcxeSR187154/out/test”,说明规则引擎只接受来自设备名叫做test的设备。您的设备可能与样例中的不同,请按照实际情况修改设备信息。
当上报的数据中,温度大于20摄氏度或者小于15摄氏度时,会触发该规则,并且解析数据中的温度、设备名称、位置,用于进一步处理。结果如图1所示。
使用方式:
SQL语句由三个部分组成:Select语句、From语句和Where语句。JSON格式的数据分为两种:带有单引号或双引号的是常量数据,不带任何引号的是变量数据。
SELECT语句中的字段是JSON消息Key里面的值,支持SQL内置的函数。您可以参考表5,获取其他SQL函数的使用方法。同时,SELECT语句不仅支持“*”和函数的组合,也支持数组和嵌套取值的JSON。例如,{"a":{"temperature":29, "color":"red"}},可以通过a.color获取到值"color":"red"。使用变量时,需要注意不带引号的字段是变量,带单引号和双引号的字段是常量。
由于温度可以是正数、0或负数,为了方便管理,上文例子中的“abs(temperature)”使用了“abs()”函数,输出温度的绝对值。
FROM语句中包含的是设备名称。您可以指定单一设备或某一个产品下的所有设备,进行消息上报。“产品名称_out_设备名称”表示指定单一设备,执行之后,该规则只对这一个设备有约束力;“产品名称/out/+”,“+”符号表示本级所有类目,可以匹配到该产品下所有的设备,因此该名称可以指定某一产品下的所有设备,执行之后,该规则对该产品下的所有设备都有约束力。
WHERE包含了条件表达式,负责筛选符合条件的字段和消息。例如上述例子中,“WHERE temperature > 20 or temperature < 15”是筛选条件,只有温度大于20摄氏度或者小于15摄氏度时,消息才会被过滤出来。WHERE语句支持的具体条件表达式请见表4。
函数执行结果支持使用as指定别名,例如使用SELECT upper(Datas.name) as name FROM TOPIC WHERE 规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"name":"OPCUA_DATA"}。别名支持生成嵌套的JSON,例如将as name改为as Datas.name,使用SELECT upper(Datas.name) as Datas.name FROM TOPIC WHERE规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"Datas":{"name":"OPCUA_DATA"}}。
| 操作符 | 描述 | 举例 | 
|---|---|---|
| = | 相等 | color = 'red' | 
| <> | 不等于 | color <> 'red' | 
| and | 逻辑与 | color = 'red' and switch = 'on' | 
| or | 逻辑或 | color = 'red' or switch = 'on' | 
| ( ) | 括号代表一个整体 | a>1 and (b<1 or b>5),此时后面是一个整体,先执行逻辑或的判定,再执行逻辑和的判定。 | 
| in | 仅支持枚举,不支持子查询 | where a in(1,2,3),不支持: where a in(select xxx) | 
| + | 算术加法 | a in (3,2,3+8) | 
| - | 算术减 | 13 – 2 | 
| / | 除 | 25 / 5 | 
| * | 乘 | 2 * 8 | 
| % | 取余数 | 10 % 2 | 
| < | 小于 | 1 < 3 | 
| <= | 小于或等于 | 1 <= 3 | 
| > | 大于 | 8 > 3 | 
| >= | 大于或等于 | 8 >= 3 | 
| CASE … WHEN … THEN … ELSE …END | Case 表达式(不支持嵌套) | case a when 3 then 'hello' when 4 then 'bye' end FROM item WHERE a >= b+c" | 
规则引擎还提供多种函数,您可以在编写SQL时使用它们,从而实现多样化数据处理。您可以在SQL语句中,使用函数获取数据或者对数据做处理。例如,SELECT service,abs(temperature),用到了abs(number)函数,具体条件表达式请见表5。
| 函数名 | 函数说明 | 
|---|---|
| abs(number) | 返回绝对值。 | 
| sin(n) | 返回n值的正弦。 | 
| cos(number) | 返回number值的余弦。 | 
| asin(number) | 返回number值的反正弦。 | 
| sinh(n) | 返回n值的双曲正弦(hyperbolic sine)。 | 
| cosh(number) | 返回number值的双曲余弦(hyperbolic cosine)。 | 
| tan(n) | 返回n值的正切。 | 
| tanh(n) | 返回n值的双曲正切(hyperbolic tangent)。 | 
| lower(string) | 返回小写字符串。 | 
| upper(string) | 返回大写字符。 | 
| power(n,m) | 返回n的m次幂。 | 
| rand() | 返回[0~1)之间随机数。 | 
| mod(n, m) | n%m余数。 | 
| log(n, m) | 返回自然对数。如果不传m值,则返回log(n)。 | 
| exp(number) | 返回指定数字的指定次幂。 | 
| floor(number) | 返回一个最接近它的整数,它的值小于或等于这个浮点数。 | 
| concat(string1, string2) | 字符串连接。示例:concat(field,a),输出“fielda”。 | 
| replace(source, substring, replacement) | 对某个目标列值进行替换。示例:replace(field,'iel','oo'),输出“food”。 | 
| topic() | 返回整个topic信息。例如,Topic: /abcdef/ghi。使用函数topic(),返回“ /abcdef/ghi”。 | 
| endswith(input, suffix) | 判断input值是否以suffix结尾。 | 
| timestamp(format) | 不带参数返回默认时间戳,带参数返回指定格式的时间戳,例如,timestamp() = 1553572557420 | 
| timestampUtc() | 获取系统Utc时间。如果不带参数,返回当前系统时间毫秒数,如果带1个参数,那么这个参数作为时间格式化的格式参数。 | 
| serviceId() | 返回消息对应的serviceId,不支持填参数。 | 
| clientId() | 获取当前topic的clientId, 不支持填参数。 | 
| arrayFilter(arrayName, compareName, compareExpr, compareValue) | 通过指定的运算符(=,like,>,<,>=,<=,<>)过滤指定数组中符合条件的内容,返回过滤后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 compareName:数组中所包含的JSON对象的属性名称,根据该属性的值进行过滤,该参数需为字符串类型,例如:'BrowseName'。 compareExpr:运算符,需为字符串类型,支持'=','like','>','<','>=','<=','<>',其中'<>'代表'不等于'。 compareValue:过滤值,支持数字和字符串类型。其中运算符为like时,过滤值支持以%开头和结尾进行模糊匹配,例如'%ab',匹配以ab结尾的字符串,'ab%'匹配以ab开头的字符串,'%ab%',匹配包含ab的字符串。 arrayFilter()和arrayOperation()可作为arrayName参数在这两个方法中嵌套使用。例如 对以下json进行解析: {
	"Datas": {
		"name": "opcua_data",
		"Nodes": [{
				"BrowseName": "test1",
				"CollectTime": "2022-12-12 10:10:10"
			},
			{
				"BrowseName": "test2",
				"CollectTime": "2022-12-12 10:10:10"
			}
		]
	}
}解析指令 : arrayOperation(arrayFilter(Datas.Nodes,'BrowseName','=',"test1"),'BrowseName','upper') as Nodes 解析结果: {
	"Nodes": [{
		"BrowseName": "TEST1",
		"CollectTime": "2022-12-12 10:10:10"
	}]
} | 
| arrayOperation(arrayName, operationElement, operationFunctionName, operationFunctionParams) | 遍历数组,结合运算符和参数进行运算,返回运算后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 operationElement:数组中所包含的需要做运算的JSON对象的属性名称,该参数需为字符串类型,例如:'BrowseName'。 operationFunctionName:运算符,需为字符串类型,支持基本运算符:'+','-','*','/','%',数字类型函数:'abs'(绝对值),'mod'(取余),'floor'(向下取整),字符串类型函数:'lower'(大写转小写),'upper'(小写转大写),'concat'(在元素后拼接字符串),'replace'(替换字符串)。 operationFunctionParams:双目运算符需要两个参数,单目运算符需要一个参数,根据不同的运算符,需要不同的参数。例如:'+' 运算符需要一个数字类型参数,'mod'运算符需要一个数字类型参数,'lower' 运算符不需要额外参数,'concat' 运算符需要一个字符串或数字类型参数,'replace' 需要两个字符串或数字类型参数。 | 
 
   
    