如何配置数据接入
使用说明
低代码平台支持从外部数据源获取数据,并按照需求对数据进行一系列处理后,转化成内部事件,输出给Kafka或ROMA Connect。此时,可通过配置数据接入,来实现整个流程。
场景描述
从物联网OneNET中获取数据源数据,经过数据处理后转化成内部事件,然后输出给Kafka。
前提条件
- 已创建数据接收后转化的内部事件parking,具体操作请参见如何自定义事件。
- 在第三方外部数据源侧,配置AstroZero提供的鉴权信息,具体操作请参考第三方文档。
操作步骤
在开发环境修改数据接入的任意配置数据(包括所有图元的配置信息),在打包时,可配置是否覆盖新环境中的同名数据接入配置,具体配置可参考应用打包发布。
- 参考登录经典应用设计器中操作,登录经典版应用设计器。
- 将鼠标放在应用的某个文件夹上(如Logic),单击“+”,选择“数据接入”。
图1 选择数据接入
- 直接单击“新建”,进入新建数据接入页面。
- 在右侧“图元”页签中,拖拽OneNET图元至左侧画布中,并配置外部数据源基本信息,单击“保存”。
输入源中,不同的图元表示不同的第三方数据源,当前支持以下几种数据源。
- OneNET
图2 OneNET配置信息页面
图3 OneNET元数据定义页面
表1 数据来源OneNET基本信息页面参数说明 参数
参数说明
标签
数据源OneNET的标签名,用于在界面展示。
名称
数据源OneNET的名称,单击参数值后系统可根据“标签”取值自动生成。
协议
通信协议。
Token
数据签名,即令牌,由OneNET提供。
报文加密
对传输过来的报文内容是否进行加密,勾选表示加密。如果设置为加密的,则需配置“Aeskey”。
Aeskey
解密密钥,由OneNET提供。
元数据
源数据的元数据定义。如果为空的“{}”,只能传送原始数据,无法使用拦截器对数据进行处理。
定义好的元数据,将会显示在右侧区域中。单击“格式化Json”,可对元数据进行Json格式化处理。
元数据定义示例如下:
{ "notifyType": "STRING", "requestId": "STRING", "timestamp": "STRING", "eventTime": "datetime", "deviceId": "STRING", "gatewayId": "STRING", "deviceService": { "name": "STRING" }, "service": { "serviceType": "STRING", "serviceId": "STRING", "body": "OBJECT", "data": { "battery_low": "number", "light": "STRING" } } }
数据接入对接中移物联OneNET平台的认证,是按照中移物联给出的对接标准,需根据配置的Token(令牌)、报文中的随机数、报文内容拼接后获取MD5摘要,并和报文中的摘要做比对,如果一致则认为报文是正常的报文。目前中移物联只支持MD5算法获取数据摘要。
同时,对于报文内容来说,OneNET平台支持对报文内容进行加密后上报。AstroZero要使用跟OneNET平台一致的加密算法进行解密,获取的加密向量要跟OneNET平台一致。因此,使用了Aeskey(解密密钥)的前16位作为加密向量。
- 设备接入 IoTDA
图4 设备接入 IoTDA配置信息页面
表2 数据来源设备接入IoTDA基本信息页面参数说明 参数
参数说明
标签
数据源设备接入IoTDA的标签名,用于在界面展示。
名称
数据源设备接入IoTDA的名称,单击参数值后系统可根据“标签”取值自动生成。
数据格式
设备上报数据的格式。
数据传输的协议支持http或者https,由IoTDA决定。
Access Key
单击“生成AK/SK”,该值会自动生成。请将生成的Access Key保存到本地,开发人员通过该值在设备侧生成鉴权属性“authorization”。
Secret Key
单击“生成AK/SK”,该值会自动生成。请将生成的Secret Key保存到本地,开发人员通过该值在设备侧生成鉴权属性“authorization”。
元数据
源数据的元数据定义。如果为空的“{}”,只能传送原始数据,无法使用拦截器对数据进行处理。
定义好的元数据,将会显示在右侧区域中。单击“格式化Json”,可对元数据进行Json格式化处理。
元数据定义示例如下:
{ "notifyType": "STRING", "requestId": "STRING", "timestamp": "STRING", "eventTime": "datetime", "deviceId": "STRING", "gatewayId": "STRING", "deviceService": { "name": "STRING" }, "service": { "serviceType": "STRING", "serviceId": "STRING", "body": "OBJECT", "data": { "battery_low": "number", "light": "STRING" } } }
上报的合法数据示例如下:
{ "notifyType": "deviceDataChanged", "requestId": "45678", "timestamp":"1900012929922992", "eventTime":"20151212T121212Z", "deviceId":"8b3979fc-b072-433b-b3f6-673072e1bc04", "gatewayId":"XXX", "deviceService":{"name":"buttery"}, "service": { "serviceType": "", "serviceId": "", "data":{ "battery_low":444, "light":"99", "authorization":"Bingo bDTKFSGi:ZwGnN+bb*****************************TjFRMA=" } } }
其中,“authorization”值根据AK/SK、上报URL和Body数据生成。
- ROMA
图5 ROMA配置信息页面
表3 数据来源ROMA Connect基本信息页面参数说明 参数
参数说明
标签
数据源的标签名,用于在界面展示。
名称
数据源的名称,单击参数值后系统可根据“标签”取值自动生成。
APP ID
MQS平台应用ID。应用ID能够引用一个本平台定义的系统参数,引用位置不限制。
示例:{!roma_app_prefix}other_roma_app_id。
密钥
MQS平台应用请求密钥。
主题
MQS主题。主题能够引用一个本平台定义的系统参数,引用位置不限制。
示例:{!roma_topic_prefix}other_roma_topic。
标签
设置订阅消息的标签,可以指定消费某一类型的消息。
默认“*”,表示消费所有类型的消息,可以写多个,用“||”隔开,例如“tag1 || tag2 || tag3”。
通道加密
数据是否加密。若勾选,表示MQS数据传输采用TLS协议。
消费位置
若勾选,停止数据接入任务,后续重新运行数据接入时,停止和重新运行期间ROMA Connect发来的消息会被完全丢弃。若停止与运行数据接入之间时间的间隔在十分钟之内,消息可能不会丢失。
存在数据丢失风险,请谨慎勾选,默认不勾选。
元数据
源数据的元数据定义。如果为空的“{}”,只能传送原始数据,无法使用拦截器对数据进行处理。
定义好的元数据,将会显示在右侧区域中。单击“格式化Json”,可对元数据进行Json格式化处理。
元数据定义示例如下:
{ "notifyType": "STRING", "requestId": "STRING", "timestamp": "STRING", "eventTime": "datetime", "deviceId": "STRING", "gatewayId": "STRING", "deviceService": { "name": "STRING" }, "service": { "serviceType": "STRING", "serviceId": "STRING", "body": "OBJECT", "data": { "battery_low": "number", "light": "STRING" } } }
- Kafka
图6 添加Kafka
表4 数据来源Kafka基本信息页面参数说明 参数
参数说明
标签
数据源Kafka的标签名,用于在界面展示。
名称
数据源Kafka的名称,单击参数值后系统可根据“标签”取值自动生成。
连接外部Kafka
AstroZero支持从平台内部Kafka或者外部Kafka采集数据,从内部Kafka采集的数据不能再输出到内部Kafka中。勾选该项,表示从外部Kafka采集数据,则需要配置“Kafka集群地址”和“订阅主题”。
默认不勾选,表示从平台内部Kafka采集数据。
Kafka集群地址
当勾选“连接外部Kafka”时,该参数才会出现。表示Kafka集群地址,多个节点服务器地址可用“,”间隔,格式为“Kafka节点1的ip:节点1端口号,Kafka节点2的ip:节点2端口号,...”。
获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”的取值。
例如:10.10.10.1:9091,10.10.10.2:9092
订阅主题
当勾选“连接外部Kafka”时,该参数才会出现,表示采集数据所属的Kafka主题。
主题是消息存储和发布的类名,通过主题可以分类传输和处理事件。
开启SASL_SSL
当勾选“连接外部Kafka”时,该参数才会出现。开启SASL_SSL后,访问外部Kafka需要提供用户名和密码进行认证,且数据会加密传输。
请根据外部Kafka服务端的实际情况配置,默认不勾选。
用户名
当勾选“开启SASL_SSL”时,该参数才会出现,表示访问外部Kafka需要的用户名。
密码
当勾选“开启SASL_SSL”时,该参数才会出现,表示访问外部Kafka需要的密码。
元数据
当勾选“连接外部Kafka”时,会出现“元数据定义”页签,选择该页签,配置源数据的元数据定义。如果为空的“{}”,只能传送原始数据,无法使用拦截器对数据进行处理。
定义好的元数据,将会显示在右侧区域中。单击“格式化Json”,可对元数据进行Json格式化处理。
元数据定义示例如下:
{ "notifyType": "STRING", "requestId": "STRING", "timestamp": "STRING", "eventTime": "datetime", "deviceId": "STRING", "gatewayId": "STRING", "deviceService": { "name": "STRING" }, "service": { "serviceType": "STRING", "serviceId": "STRING", "body": "OBJECT", "data": { "battery_low": "number", "light": "STRING" } } }
事件
当不勾选“连接外部Kafka”时,该参数才会出现。表示从平台内部Kafka采集数据时,数据所关联的具体事件。
- OneNET
- 在
中,拖拽所需的拦截器到左侧区域,对源数据进行数据拦截,配置后单击“保存”。当前支持如下几种类型的拦截器:
- 投影
- 赋值
表6 赋值器配置参数说明表 参数
参数说明
标签
赋值器的标签名,用于在界面展示。
名称
赋值器的名称,单击参数值后系统可根据“标签”取值自动生成。
新增字段
被赋值的参数。
源
源数据,支持以下几种类型。
- Cache:缓存中的对象数据,需要提前在设置中单击“新增缓存”进行添加。
- reference:传入的外部数据源元数据。
- formula:数据取表达式值,格式为“XXX(数据源参数)”。
- constant:某常量,格式为“常量值”。
在设置中,单击“新增缓存”,添加缓存对象数据时,设置如下图所示。
图9 “新增缓存”配置
表7 “新增缓存”参数说明 参数
参数说明
标签
该Cache的标签名,用于在界面展示。
名称
该Cache的名称,单击参数值后系统可根据“标签”取值自动生成。
对象
映射的对象名。
高级配置
高级配置,开关打开后需要配置如下参数。
- 缓存上限:最大缓存记录数。
- 过期时间:缓存超时时间。
默认不打开,采用系统默认配置。
字段选择器
左侧为该对象的全部字段,勾选所需数据字段,单击,将所选字段移到右侧输出字段中。
键
与外部元数据关联的对象字段。
选择源字段
传入的外部数据源元数据中与该对象关联的数据字段。
- 过滤
- 在
中,拖拽通道中“内存通道”至左侧画布区域Onenet下方,并配置数据通道的基本信息,单击“保存”。通道中“内存通道”表示普通的数据通道,当数据较多时可选择“文件通道”图标进行拖拽。图11 内存通道配置
表9 内存通道配置参数说明 参数
参数说明
标签
内存通道的标签名,用于在界面展示。
名称
内存通道的名称,单击参数值后系统可根据“标签”取值自动生成。
高级配置
高级配置,开关打开后需要配置如下参数:
- 通道容量:通道数据最大条数。
- 最大传输量:发到单个输出源的数据最大条数。
默认不打开,采用系统默认配置。
- 在
中,拖拽所需的输出源至左侧画布区域最下方,设置输出事件和输出事件数据的接收方。
- 当接收方是Kafka时,配置如下。
支持拖拽多个输出源中的Kafka图标,设置多个事件接收方。图12 数据接收方配置页签
表10 数据接收方配置参数说明 参数
参数说明
标签
数据接收方Kafka的标签名,用于在界面展示。
名称
数据接收方Kafka的名称,单击参数值后系统可根据“标签”取值自动生成。
事件
输出的事件。
发送到外部
是否发送到外部Kafka。若打开该开关,需要配置“Kafka集群地址”和“订阅主题”。
- Kafka集群地址:接收事件的kafka集群地址,多个节点服务器地址可用“,”间隔,格式为“Kafka节点1的ip:节点1端口号,Kafka节点2的ip:节点2端口号,...”。
获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”的取值。
例如:10.136.14.56:9092
说明:接收事件的kafka集群,不需要一定配置为和AstroZero相连。
- 订阅主题:接收事件数据的kafka Topic。
- 是否开启SASL_SSL:当发送到外部Kafka时,该参数才会出现。表示是否开启SASL_SSL认证,开启后,访问外部Kafka需要提供用户名和密码进行认证,且数据会加密传输。请根据外部Kafka服务端的实际情况配置,默认不勾选,即输出到与AstroZero相连的Kafka。
事件属性/作为分区Key
是否将事件中的参数字段作为Kafka的消息头,用于事件分发时发送到不同的Kafka分区,建议选择有区分度的关键字段。
事件属性/属性、值
配置传输的数据与事件中已定义参数的对应关系,单击“新增行”,可添加多行。
- Kafka集群地址:接收事件的kafka集群地址,多个节点服务器地址可用“,”间隔,格式为“Kafka节点1的ip:节点1端口号,Kafka节点2的ip:节点2端口号,...”。
- 当接收方是ROMA,配置如下。
图13 数据接收方ROMA配置页签
表11 ROMA Connect基本信息页面参数说明 参数
参数说明
标签
接收方的显示标签名,用于在界面展示。
名称
接收方的名称,单击参数值后系统可根据“标签”取值自动生成。
APP ID
MQS平台应用ID,应用ID能够引用一个本平台定义的系统参数,引用位置不限制。
示例:{!roma_app_prefix}other_roma_app_id。
密钥
MQS平台应用请求密钥。
主题
MQS主题,主题能够引用一个本平台定义的系统参数,引用位置不限制。
示例:{!roma_topic_prefix}other_roma_topic。
标签
设置订阅消息的标签,可以指定消费某一类型的消息。
默认“*”,表示消费所有类型的消息,可以写多个,用“||”隔开,例如“tag1 || tag2 || tag3”。
加密传输
数据是否加密。
- 当接收方是Kafka时,配置如下。
- 单击页面上方的,设置数据接入基本参数,单击“保存”。
表12 数据接入参数说明 参数
参数说明
标签
数据接入的标签名,用于在界面展示。
名称
数据接入的名称,单击参数值后系统可根据“标签”取值自动生成。
描述
数据接入的描述信息。
- 单击页面上方的,启用数据接入。
- 启用成功后,单击,即可运行该数据接入。
Kafka接收到数据,表示设置成功。
在数据接入编辑页面上方,单击,可停止数据接入的运行。
- 在运行成功的提示框中,复制数据推送地址,并进行保存。
在该数据接入运行后,单击页面上方的,可复制数据推送地址。
图14 复制数据推送地址
- 在第三方外部数据源侧,配置AstroZero提供的数据推送地址,具体操作请参考第三方文档。
- (可选)可执行该步骤进行调试或故障定位。
在数据接入编辑页面上方,单击,下载日志,查看日志进行定位。当数据源或输出源配置信息有误时,可通过下载日志进行定位。
- (可选)进行消息跟踪。
在数据接入编辑页面右侧,选择“消息跟踪”页签,单击“启动”,可启动消息跟踪。消息跟踪在一个周期(默认30分钟)后,自动关闭。
图15 启动消息跟踪
启动消息跟踪后,可实时查看上报的报文处理情况。
图16 消息跟踪
异常处理
当数据源类型选择的是ROMA时,配置完数据接入后,MQS平台收不到消息,业务设置的EventTrigger没有拉起服务编排。现象、原因分析以及解决方法如下:
- 问题现象:ROMA消息轨迹是灰色的,可能是MQS服务地址配置错误或者没有配置。
解决方法:检查iotgateway容器的“roma-server”参数是否配置。若没有配置,请配置该参数或联系运维添加正确的“roma-server”地址(端口为“9776”),确认网络正常以及MQS服务器正常,兼容客户端版本。
- 问题现象:ROMA消息轨迹是红色的,则可能是MQS的source定义错误。
解决方法:在ROMA Connect平台查看应用ID和对应的密钥是否一致,查看该应用是否订阅了主题。配置错误时会导致鉴权失败。
- 问题现象:ROMA消息轨迹是绿色的, 表示MQS接收成功,并成功发送到Kafka,AstroZero也收到了对应的消息,但是可能没成功拉起服务编排。
解决方法:查看iotgateway的“flume.log”日志,确认MQS是否接收成功,通过“kafka-console-cumer”确认消息是否转换成功,通过AstroZero的日志跟踪功能或者AstroZero后台日志确认(日志关键字为“OnEvent”)平台是否收到事件。如果没有对应的服务编排或者脚本日志,可以确定服务编排未拉起,联系华为工程师解决。