- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- 开始使用ROMA Connect
- 实例管理
- 集成应用管理
-
数据源管理
- ROMA Connect支持的数据源
- 接入API数据源
- 接入ActiveMQ数据源
- 接入ArtemisMQ数据源
- 接入DB2数据源
- 接入DIS数据源
- 接入DWS数据源
- 接入DM数据源
- 接入Gauss100数据源
- 接入FTP数据源
- 接入HL7数据源
- 接入HANA数据源
- 接入HIVE数据源
- 接入LDAP数据源
- 接入IBM MQ数据源
- 接入Kafka数据源
- 接入MySQL数据源
- 接入MongoDB数据源
- 接入MQS数据源
- 接入MRS Hive数据源
- 接入MRS HDFS数据源
- 接入MRS HBase数据源
- 接入MRS Kafka数据源
- 接入OBS数据源
- 接入Oracle数据源
- 接入PostgreSQL数据源
- 接入Redis数据源
- 接入RabbitMQ数据源
- 接入RocketMQ数据源
- 接入SAP数据源
- 接入SNMP数据源
- 接入SQL Server数据源
- 接入GaussDB(for MySQL)数据源
- 接入WebSocket数据源
- 接入自定义数据源
- 数据集成指导
- 服务集成指导
- 服务集成指导(旧版界面)
- 消息集成指导
- 设备集成指导
- 应用业务模型使用指导
- 扩大资源配额
- 查看审计日志
- 查看监控指标
- 权限管理
- 用户指南(新版)
- 最佳实践
-
开发指南
- 数据集成开发指导
-
服务集成开发指导
- 开发说明
- API调用认证开发(APP认证)
- API调用认证开发(IAM认证)
-
自定义后端开发(函数后端)
- 函数后端脚本开发说明
- AesUtils类说明
- APIConnectResponse类说明
- Base64Utils类说明
- CacheUtils类说明
- CipherUtils类说明
- ConnectionConfig类说明
- DataSourceClient类说明
- DataSourceConfig类说明
- ExchangeConfig类说明
- HttpClient类说明
- HttpConfig类说明
- JedisConfig类说明
- JSON2XMLHelper类说明
- JSONHelper类说明
- JsonUtils类说明
- JWTUtils类说明
- KafkaConsumer类说明
- KafkaProducer类说明
- KafkaConfig类说明
- MD5Encoder类说明
- Md5Utils类说明
- QueueConfig类说明
- RabbitMqConfig类说明
- RabbitMqProducer类说明
- RedisClient类说明
- RomaWebConfig类说明
- RSAUtils类说明
- SapRfcClient类说明
- SapRfcConfig类说明
- SoapClient类说明
- SoapConfig类说明
- StringUtils类说明
- TextUtils类说明
- XmlUtils类说明
- 自定义后端开发(数据后端)
- 后端服务签名校验开发
- 消息集成开发指导
- 设备集成开发指导
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 公共资源API
- 数据集成API
- 服务集成API
- 消息集成API
- 设备集成API
- 应用示例
- 权限和授权项
- 附录
- 历史API
- 修订记录
- SDK参考
-
常见问题
- 实例管理
-
数据集成
-
数据集成普通任务
- FDI各类数据库支持哪些数据类型?
- 跟踪号是什么,能跟踪到数据吗?
- FDI任务是否支持清空目标表?
- FDI任务只能采集单张表到单张表吗?
- 用户创建的FDI任务,同一账号的其他用户可见吗?
- FDI通过公网对接其他租户的MRS HIVE如何配置?
- 从OBS解析文件到RDS数据库,采集过一次后,后面采集会进行更新吗?
- OBS源端的CSV文件解析到关系型数据库时,列的值不对怎么办?
- MRS Hive目标字段和源端字段数据类型不匹配时,数据是否能集成到目标端?
- MRS Hive、MRS HBase和MongoDB的Mapping映射手动输入时,是否区分大小写?
- MRS Hive是否支持分区?
- 源端API类型数据源自定义周期如何设置?
- SAP是否支持分页读取视图?
- 数据集成组合任务
-
数据集成普通任务
- 服务集成
- 消息集成
- 设备集成
-
故障排除
-
数据集成任务
- MRS Hive目标端写入时出现数据乱码
- MRS Hive写入时数据全部写在第一个字段里
- 目标端任务报任务运行超时
- MySQL到MRS Hive时目标端报“could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation”错误
- Mysql到Mysql时源端报“Illegal mix of collations for operation 'UNION'”错误
- 源端Mysql增量采集每小时执行一次时部分数据丢失
- API到MySQL时源端报“401 unauthorized”错误
- Kafka集到Mysql目标端报“cannot find record mapping field”错误
- API到MySQL的定时任务时会出现源端报“connect timeout”错误
- Kafka到Mysql的实时任务时,MQS中的Topic下有数据,但是FDI任务没有采集到数据。
- Mysql到Mysql的定时任务,源端有类型为tinyint(1),值为2的字段,但是采集到目标端值就变成了1
- 目标端数据源为公网Kafka时,定时任务目标端报“The task executes failed.Writer data to kafka failed”错误
- 数据集成组合任务
- 数据源
- 服务集成
- 设备集成
-
数据集成任务
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
配置数据转发规则
概述
规则引擎可以订阅设备Topic,获取设备上报的数据,然后将解析过的数据发送到其他云服务供其使用。例如,用户可制定规则,命令设备每小时上报一次设备温度,如果设备温度超出正常范围,则关闭该设备,给用户发送告警信息;LINK将收集到的数据传输到大数据分析平台,评估其他设备发生故障的风险。
前提条件
- 每个数据转发规则要归属到某个集成应用下,在创建规则前您需要有可用的集成应用,否则请提前创建集成应用。
- 使用规则引擎转发DIS时,用户需要具备DIS Administrator角色权限。
创建规则
- 登录ROMA Connect控制台,在“实例”页面单击实例上的“查看控制台”,进入实例控制台。
- 在左侧的导航栏选择“设备集成 LINK > 规则引擎”,单击页面右上角的“创建规则”。
- 在创建规则弹窗中配置规则相关信息,完成后单击“确认”。
表1 规则信息配置 参数
配置说明
规则名称
填写规则的名称,根据规划自定义。建议您按照一定的命名规则填写规则名称,方便您快速识别和查找。
集成应用
选择规则所属的集成应用名称。
规则描述
填写规则的描述信息。
状态
选择是否启用规则,默认启用。只有启用后,规则才生效。
- 规则创建完成后,在规则列表中单击已创建规则的名称,进入规则详情页面。
- 配置规则的数据源端。
- 单击“数据源端”下的“创建数据源端”,增加一条数据源端配置。
- 配置数据源端相关信息,完成后单击“保存”。
表2 数据源端配置 参数
配置说明
产品名称
选择设备所属的产品。
设备名称
选择要转发数据的设备,可以选择指定设备或全部设备。
Topic名称
选择设备上报消息使用的Topic。
Topic级别
选择Topic的级别,根据“设备名称”的选择自动适配,“设备名称”不做选择,默认为“产品级别”;“设备名称”选择具体设备则默认匹配为“设备级别”。
base64编码
是否对转发的设备数据进行base64编码。
包含设备信息
转发的设备数据是否包含设备信息。
- (可选)配置对转发数据的数据解析,对转发数据进行过滤筛选。
- 配置规则的数据目的端。
说明:
Site实例中,数据源目的端仅支持选择ROMA MQS和设备的订阅Topic。
- 单击“数据目的端”下的“创建数据目的端”,增加一条数据目的端配置。
- 配置数据目的端相关信息,完成后单击“保存”。
表3 数据目的端配置 数据源目的端
参数
配置说明
ROMA MQS
连接地址
选择ROMA MQS的连接地址。
Topic名称
选择数据要转发到的Topic名称。
用户名
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Key。
密码
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Secret。
分布式消息服务Kafka
连接地址
选择分布式消息服务 Kafka的连接地址。
Topic名称
选择数据要转发到的Topic名称。
用户名
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Key。
密码
仅当ROMA Connect实例的“MQS SASL_SSL”已开启时需要配置。
填写“Topic名称”中Topic所属集成应用的Secret。
数据接入服务 DIS
通道列表
选择数据要转发到的DIS通道。DIS通道是租户创建的逻辑单位,用以区分不同租户实时数据的集合,用户使用DIS发送或接收数据时,需要指定通道名称。
委托服务
选择一个委托任务。 委托服务是指用户在IAM处创建委托,授予ROMA Connect访问DIS的权限,委托策略选择“DIS User”。
设备的订阅Topic
产品
选择设备所属的产品。
设备
选择数据要转发到的设备。
Topic
选择数据要转发到的设备Topic。
SQL解析
概念:
设备接入ROMA Connect以后,会把数据封装成JSON格式的消息,发送给ROMA Connect。JSON中包含了Key值和Value值,为了方便理解,可以把规则看作一条SQL语句;把JSON看作一张表, Key值的是这张表的列,Value值是表的列值。通过SQL语句过滤设备消息,并将其发送给其他服务。
例如,有一个温度传感器,用于控制设备温度。它可以采集设备种类、环境温度、环境湿度、当前时间,上报的格式和内容如样例所示:
{ "device":"camera", "temperature":30, "humidity":65, "time":"xxx,xxx" }
如果您想制定一条规则,当温度高于20摄氏度或低于15摄氏度时,发送告警消息,那么您可以输入以下SQL语句。执行这条语句,满足上述条件时,LINK会上报设备种类、设备温度的绝对值、设备湿度和时间,用于进一步处理。
SELECT device,abs(temperature), humidity , TIME FROM mcxeSR187154/OUT/test WHERE temperature > 20 OR temperature < 15
FROM语句中的“mcxeSR187154/out/test”,说明规则引擎只接受来自设备名叫做test的设备。您的设备可能与样例中的不同,请按照实际情况修改设备信息。
当上报的数据中,温度大于20摄氏度或者小于15摄氏度时,会触发该规则,并且解析数据中的温度、设备名称、位置,用于进一步处理。结果如图1所示。
使用方式:
SQL语句由三个部分组成:Select语句、From语句和Where语句。JSON格式的数据分为两种:带有单引号或双引号的是常量数据,不带任何引号的是变量数据。
SELECT语句中的字段是JSON消息Key里面的值,支持SQL内置的函数。您可以参考表5,获取其他SQL函数的使用方法。同时,SELECT语句不仅支持“*”和函数的组合,也支持数组和嵌套取值的JSON。例如,{"a":{"temperature":29, "color":"red"}},可以通过a.color获取到值"color":"red"。使用变量时,需要注意不带引号的字段是变量,带单引号和双引号的字段是常量。
由于温度可以是正数、0或负数,为了方便管理,上文例子中的“abs(temperature)”使用了“abs()”函数,输出温度的绝对值。
FROM语句中包含的是设备名称。您可以指定单一设备或某一个产品下的所有设备,进行消息上报。“产品名称_out_设备名称”表示指定单一设备,执行之后,该规则只对这一个设备有约束力;“产品名称/out/+”,“+”符号表示本级所有类目,可以匹配到该产品下所有的设备,因此该名称可以指定某一产品下的所有设备,执行之后,该规则对该产品下的所有设备都有约束力。
WHERE包含了条件表达式,负责筛选符合条件的字段和消息。例如上述例子中,“WHERE temperature > 20 or temperature < 15”是筛选条件,只有温度大于20摄氏度或者小于15摄氏度时,消息才会被过滤出来。WHERE语句支持的具体条件表达式请见表4。
函数执行结果支持使用as指定别名,例如使用SELECT upper(Datas.name) as name FROM TOPIC WHERE 规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"name":"OPCUA_DATA"}。别名支持生成嵌套的JSON,例如将as name改为as Datas.name,使用SELECT upper(Datas.name) as Datas.name FROM TOPIC WHERE规则查询语句解析{"Datas":{"name":"opcua_data"}},可以得到{"Datas":{"name":"OPCUA_DATA"}}。
操作符 |
描述 |
举例 |
---|---|---|
= |
相等 |
color = 'red' |
<> |
不等于 |
color <> 'red' |
and |
逻辑与 |
color = 'red' and switch = 'on' |
or |
逻辑或 |
color = 'red' or switch = 'on' |
( ) |
括号代表一个整体 |
a>1 and (b<1 or b>5),此时后面是一个整体,先执行逻辑或的判定,再执行逻辑和的判定。 |
in |
仅支持枚举,不支持子查询 |
where a in(1,2,3),不支持: where a in(select xxx) |
+ |
算术加法 |
a in (3,2,3+8) |
- |
算术减 |
13 – 2 |
/ |
除 |
25 / 5 |
* |
乘 |
2 * 8 |
% |
取余数 |
10 % 2 |
< |
小于 |
1 < 3 |
<= |
小于或等于 |
1 <= 3 |
> |
大于 |
8 > 3 |
>= |
大于或等于 |
8 >= 3 |
CASE … WHEN … THEN … ELSE …END |
Case 表达式(不支持嵌套) |
case a when 3 then 'hello' when 4 then 'bye' end FROM item WHERE a >= b+c" |
规则引擎还提供多种函数,您可以在编写SQL时使用它们,从而实现多样化数据处理。您可以在SQL语句中,使用函数获取数据或者对数据做处理。例如,SELECT service,abs(temperature),用到了abs(number)函数,具体条件表达式请见表5。
函数名 |
函数说明 |
---|---|
abs(number) |
返回绝对值。 |
sin(n) |
返回n值的正弦。 |
cos(number) |
返回number值的余弦。 |
asin(number) |
返回number值的反正弦。 |
sinh(n) |
返回n值的双曲正弦(hyperbolic sine)。 |
cosh(number) |
返回number值的双曲余弦(hyperbolic cosine)。 |
tan(n) |
返回n值的正切。 |
tanh(n) |
返回n值的双曲正切(hyperbolic tangent)。 |
lower(string) |
返回小写字符串。 |
upper(string) |
返回大写字符。 |
power(n,m) |
返回n的m次幂。 |
rand() |
返回[0~1)之间随机数。 |
mod(n, m) |
n%m余数。 |
log(n, m) |
返回自然对数。如果不传m值,则返回log(n)。 |
exp(number) |
返回指定数字的指定次幂。 |
floor(number) |
返回一个最接近它的整数,它的值小于或等于这个浮点数。 |
concat(string1, string2) |
字符串连接。示例:concat(field,a),输出“fielda”。 |
replace(source, substring, replacement) |
对某个目标列值进行替换。示例:replace(field,'iel','oo'),输出“food”。 |
topic() |
返回整个topic信息。例如,Topic: /abcdef/ghi。使用函数topic(),返回“ /abcdef/ghi”。 |
endswith(input, suffix) |
判断input值是否以suffix结尾。 |
timestamp(format) |
不带参数返回默认时间戳,带参数返回指定格式的时间戳,例如,timestamp() = 1553572557420 |
timestampUtc() |
获取系统Utc时间。如果不带参数,返回当前系统时间毫秒数,如果带1个参数,那么这个参数作为时间格式化的格式参数。 |
serviceId() |
返回消息对应的serviceId,不支持填参数。 |
clientId() |
获取当前topic的clientId, 不支持填参数。 |
arrayFilter(arrayName, compareName, compareExpr, compareValue) |
通过指定的运算符(=,like,>,<,>=,<=,<>)过滤指定数组中符合条件的内容,返回过滤后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 compareName:数组中所包含的JSON对象的属性名称,根据该属性的值进行过滤,该参数需为字符串类型,例如:'BrowseName'。 compareExpr:运算符,需为字符串类型,支持'=','like','>','<','>=','<=','<>',其中'<>'代表'不等于'。 compareValue:过滤值,支持数字和字符串类型。其中运算符为like时,过滤值支持以%开头和结尾进行模糊匹配,例如'%ab',匹配以ab结尾的字符串,'ab%'匹配以ab开头的字符串,'%ab%',匹配包含ab的字符串。 arrayFilter()和arrayOperation()可作为arrayName参数在这两个方法中嵌套使用。例如 对以下json进行解析: { "Datas": { "name": "opcua_data", "Nodes": [{ "BrowseName": "test1", "CollectTime": "2022-12-12 10:10:10" }, { "BrowseName": "test2", "CollectTime": "2022-12-12 10:10:10" } ] } } 解析指令 : arrayOperation(arrayFilter(Datas.Nodes,'BrowseName','=',"test1"),'BrowseName','upper') as Nodes 解析结果: { "Nodes": [{ "BrowseName": "TEST1", "CollectTime": "2022-12-12 10:10:10" }] } |
arrayOperation(arrayName, operationElement, operationFunctionName, operationFunctionParams) |
遍历数组,结合运算符和参数进行运算,返回运算后数组。该函数支持在字段中使用,不支持在条件中使用。 不支持对数组嵌套数组的内层json内容进行筛选或处理。 arrayName:待过滤数组名称,例如:数据为{"Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]},则数组名称为Nodes;数据为{"Datas":{"name":"opcua_data", "Nodes":[{"BrowseName":"test1", "CollectTime":"2022-12-12 10:10:10"}]}},则数组名称为Datas.Nodes。 operationElement:数组中所包含的需要做运算的JSON对象的属性名称,该参数需为字符串类型,例如:'BrowseName'。 operationFunctionName:运算符,需为字符串类型,支持基本运算符:'+','-','*','/','%',数字类型函数:'abs'(绝对值),'mod'(取余),'floor'(向下取整),字符串类型函数:'lower'(大写转小写),'upper'(小写转大写),'concat'(在元素后拼接字符串),'replace'(替换字符串)。 operationFunctionParams:双目运算符需要两个参数,单目运算符需要一个参数,根据不同的运算符,需要不同的参数。例如:'+' 运算符需要一个数字类型参数,'mod'运算符需要一个数字类型参数,'lower' 运算符不需要额外参数,'concat' 运算符需要一个字符串或数字类型参数,'replace' 需要两个字符串或数字类型参数。 |