更新时间:2024-08-21 GMT+08:00
分享

通过数据接入将消息类数据接入AstroZero

AstroZero低代码平台支持从外部数据源获取数据,并按照需求对数据进行一系列处理后,转化成内部事件,输出给Kafka或ROMA Connect。例如,从物联网OneNET中获取数据源数据,经过数据处理后转化成内部事件,然后输出给Kafka。

前提条件

  • 已创建数据接收后转化的内部事件parking,具体操作请参见创建AstroZero自定义事件
  • 在第三方外部数据源侧,配置AstroZero提供的鉴权信息,具体操作请参见第三方文档。
  • 如果对接的是ROMA Connect数据源,请参考设置应用中对接的ROMA Connect信息中操作,在AstroZero中设置ROMA Connect实例信息。

数据接入

在开发环境修改数据接入的任意配置数据(包括所有图元的配置信息),在打包时,可配置是否覆盖新环境中的同名数据接入配置。

  1. 参考登录AstroZero新版应用设计器中操作,登录应用设计器。
  2. 在左侧导航栏中,选择“集成”。
  3. 单击消息接入后的“+”,进入新建消息接入页面。
  4. 直接单击“新建”,进入新建数据接入页面。
  5. 在页面右侧“图元 > 输入源”中,拖拽所需的图元到左侧画布中,并配置外部数据源基本信息,单击“保存”

    • OneNET
      通过OneNet Source,可以快速配置HTTP消息、鉴权设置,并接收OneNet发送的消息。
      图1 OneNET配置信息页面
      图2 定义OneNET元数据
      表1 添加OneNET参数说明

      类别

      参数

      说明

      配置信息

      标签

      数据源OneNET的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      数据源OneNET在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      Token

      数据签名,即令牌,由OneNET提供。

      报文加密

      对传输过来的报文内容是否进行加密,勾选表示加密。如果设置为加密,需要配置“Aeskey”

      Aeskey

      解密密钥,由OneNET提供。

      元数据定义

      元数据

      定义OneNET元数据,如果为空的{},只能传送原始数据,无法使用拦截器对数据进行处理。

      例如,输入如下元数据,单击“格式化Json”,可对元数据进行Json格式化处理。

      {
          "notifyType": "STRING",
          "requestId": "STRING",
          "timestamp": "STRING",
          "eventTime": "datetime",
          "deviceId": "STRING",
          "gatewayId": "STRING",
          "deviceService": {
              "name": "STRING"
          },
          "service": {
              "serviceType": "STRING",
              "serviceId": "STRING",
              "body": "OBJECT",
              "data": {
                  "battery_low": "number",
                  "light": "STRING"
              }
          }
      }
      注意:
      • 数据对接中,移物联OneNET平台的认证是按照中移物联给出的对接标准,需根据配置的Token(令牌)、报文中的随机数、报文内容拼接后获取MD5摘要,并和报文中的摘要做比对,如果一致则认为报文是正常的报文。目前中移物联,只支持MD5算法获取数据摘要。
      • 对于报文内容来说,OneNET平台支持对报文内容进行加密后上报。AstroZero要使用跟OneNET平台一致的加密算法进行解密,获取的加密向量要和OneNET平台一致。因此,使用了Aeskey(解密密钥)的前16位,作为加密向量。
    • 设备接入IoTDA
      通过设备接入IoTDA Source,可快速配置HTTP消息、鉴权设置,并接收设备接入IoTDA发送的消息。
      图3 IoTDA配置信息
      图4 定义IoTDA元数据
      表2 接入IoTDA设备参数说明

      类别

      参数

      说明

      配置信息

      标签

      数据源设备接入IoTDA的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      数据源设备接入IoTDA在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      数据格式

      设备上报数据的格式,数据传输协议支持http或https,由IoTDA决定。

      认证方式

      设置认证方式,支持证书认证、口令认证和无认证。

      Access Key

      单击“生成AK/SK”,该值会自动生成。请将生成的Access Key保存到本地,开发人员需要通过该值,在设备侧生成鉴权属性“authorization”。“认证方式”设置为“口令认证”时,才会显示该参数。

      Secret Key

      单击“生成AK/SK”,该值会自动生成。请将生成的Secret Key保存到本地,开发人员需要通过该值,在设备侧生成鉴权属性“authorization”。“认证方式”设置为“口令认证”时,才会显示该参数。

      元数据定义

      元数据

      定义IoTDA元数据,如果为空的{},只能传送原始数据,无法使用拦截器对数据进行处理。

      例如,定义如下元数据,单击“格式化Json”,可对元数据进行Json格式化处理。

      {
          "notifyType": "STRING",
          "requestId": "STRING",
          "timestamp": "STRING",
          "eventTime": "datetime",
          "deviceId": "STRING",
          "gatewayId": "STRING",
          "deviceService": {
              "name": "STRING"
          },
          "service": {
              "serviceType": "STRING",
              "serviceId": "STRING",
              "body": "OBJECT",
              "data": {
                  "battery_low": "number",
                  "light": "STRING"
              }
          }
      }

      上报的合法数据示例如下:

      {
          "notifyType": "deviceDataChanged",
          "requestId": "45678",
          "timestamp":"1900012929922992",
          "eventTime":"20151212T121212Z",
          "deviceId":"8b3979fc-b072-433b-b3f6-673072e1bc04",
          "gatewayId":"XXX",
          "deviceService":{"name":"buttery"},
          "service": {
           "serviceType": "",
           "serviceId": "",
           "data":{
                           "battery_low":444,
                           "light":"99",
                           "authorization":"Bingo bDTKFSGi:ZwGnN+bb*****************************TjFRMA="
                  }
          }
      }

      其中,“authorization”需要根据AK/SK、上报URL和Body数据生成。

    • ROMA Connect
      通过MQS Source,可快速配置MQS订阅参数,并接收MQS发布的消息。
      图5 ROMA Connect配置信息页面
      图6 定义MQS元数据
      表3 添加MQS参数说明

      类别

      参数

      说明

      配置信息

      标签

      数据源的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      数据源在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      App ID

      MQS平台的应用ID。应用ID能够引用一个本平台定义的系统参数,引用位置不限制。例如,“!roma_app_prefix}other_roma_app_id”

      密钥

      MQS平台应用的请求密钥。

      主题

      MQS主题。主题能够引用一个本平台定义的系统参数,引用位置不限制。例如,“{!roma_topic_prefix}other_roma_topic”

      MQS配置 > 标签

      设置订阅消息的标签,可以指定消费某一类型的消息。默认“*”,表示消费所有类型的消息,可以写多个,用“||”隔开,例如“tag1 || tag2 || tag3”。

      通道加密

      数据是否加密。若勾选,表示MQS数据传输采用TLS协议。

      默认为勾选。

      消费位置

      若勾选,停止数据接入任务,后续重新运行数据接入时,停止和重新运行期间ROMA Connect发来的消息会被完全丢弃。若停止与运行数据接入之间时间间隔在十分钟内,消息可能不会丢失。

      默认为不勾选。

      元数据定义

      元数据

      定义MQS元数据,如果为空的{},只能传送原始数据,无法使用拦截器对数据进行处理。

      定义好元数据后,单击“格式化Json”,可对元数据进行Json格式化处理。

      {
          "notifyType": "STRING",
          "requestId": "STRING",
          "timestamp": "STRING",
          "eventTime": "datetime",
          "deviceId": "STRING",
          "gatewayId": "STRING",
          "deviceService": {
              "name": "STRING"
          },
          "service": {
              "serviceType": "STRING",
              "serviceId": "STRING",
              "body": "OBJECT",
              "data": {
                  "battery_low": "number",
                  "light": "STRING"
              }
          }
      }
    • Kafka
      从平台内部kafka或外部kafka采集数据,从内部kafka采集的数据不能再回写到内部kafka中。
      图7 添加Kafka
      表4 添加Kafka参数说明

      参数

      说明

      标签

      数据源的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      数据源在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      连接外部Kafka

      AstroZero支持从平台内部Kafka或外部Kafka采集数据,从内部Kafka采集的数据不能再输出到内部Kafka中。
      • 勾选:表示从外部Kafka采集数据。
      • 不勾选:表示从平台内部Kafka采集数据。

      Kafka集群地址

      勾选“连接外部Kafka”时,才会显示该参数。表示Kafka集群地址,多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

      获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”的值。例如,10.10.10.1:9091,10.10.10.2:9092。

      主题

      勾选“连接外部Kafka”时,才会显示该参数。表示采集数据所属的Kafka主题。主题是消息存储和发布的类名,通过主题可以分类传输和处理事件。

      安全协议

      勾选“连接外部Kafka”时,才会显示该参数。设置为SASL_SSL时,访问外部Kafka需要提供用户名和密码进行认证,且数据会加密传输。

      用户名

      访问外部Kafka的用户名。勾选“连接外部Kafka”时,才会显示该参数。

      密码

      访问外部Kafka的用户密码。勾选“连接外部Kafka”时,才会显示该参数。

      元数据定义

      勾选“连接外部Kafka”时,才会显示该页签。定义MQS的元数据,如果为空的{},只能传送原始数据,无法使用拦截器对数据进行处理。

      定义好元数据后,单击“格式化Json”,可对元数据进行Json格式化处理。

      {
          "notifyType": "STRING",
          "requestId": "STRING",
          "timestamp": "STRING",
          "eventTime": "datetime",
          "deviceId": "STRING",
          "gatewayId": "STRING",
          "deviceService": {
              "name": "STRING"
          },
          "service": {
              "serviceType": "STRING",
              "serviceId": "STRING",
              "body": "OBJECT",
              "data": {
                  "battery_low": "number",
                  "light": "STRING"
              }
          }
      }

      事件

      从平台内部Kafka采集数据,数据所关联的具体事件。不勾选“连接外部Kafka”时,才会显示该参数。

  6. 图元 > 拦截器中,拖拽所需的拦截器到左侧页面,对源数据进行数据拦截,并单击“保存”。

    • 投影:数据选择器,可从源数据中选择部分数据进行发送。
      图8 投影配置
      表5 添加投影参数说明

      参数

      说明

      标签

      投影的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      投影在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      字段选择器

      左侧为外部数据源传入的全部元数据,勾选所需数据字段,单击将所选数据移到右侧输出字段中。

    • 赋值:数据赋值器,可将缓存中的对象数据、传入的外部数据源数据、某个常量数据或者数据取表达式的值,赋值给某参数。
      图9 赋值器配置
      表6 添加赋值器参数说明

      参数

      说明

      标签

      赋值器的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      赋值器在系统中的唯一标识。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      赋值

      单击“新增”,为赋值器进行赋值。

      • 新增字段:被赋值的参数。
      • 源:设置源数据。
        • Cache:缓存中的对象数据,需要提前在设置中单击“新增缓存”进行添加。
        • reference:传入的外部数据源元数据。
        • formula:数据取表达式值,格式为“XXX(数据源参数)”。
        • constant:某常量,格式为“常量值”。
    • 过滤:数据过滤器,通过配置一定的过滤条件,对外部元数据进行数据过滤。
      图10 过滤器配置
      表7 添加过滤器参数说明

      参数

      说明

      标签

      过滤器的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      过滤器在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      条件设置

      设置过滤条件,单击“新增”,可添加多个过滤条件。

  7. 图元 > 通道中,拖拽所需的通道到左侧画布区域Onenet下方,并配置数据通道的基本信息,单击“保存”。

    “内存通道”表示普通的数据通道,当数据较多时可选择“文件通道”。
    图11 内存通道配置
    表8 添加内存通道参数说明

    参数

    说明

    标签

    内存通道的标签名,用于在页面显示。

    取值范围:1~64个字符。

    名称

    内存通道在系统中的唯一标识,创建后不可修改。

    命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

    高级设置

    开启后,可自定义通道容量和最大传输量。默认不打开,采用系统默认配置。
    • 通道容量:通道数据最大条数。
    • 最大传输量:发到单个输出源的数据最大条数。

  8. 图元 > 输出源中,拖拽输出源图元到左侧画布区域最下方,设置输出事件和输出事件数据的接收方,并单击“保存”

    • 当接收方是Kafka时,配置如下。
      可拖拽多个输出源中的Kafka图标,设置多个事件接收方。
      图12 数据接收方配置页签
      表9 数据接收方配置页签参数说明

      区域

      参数

      说明

      基本信息

      标签

      数据接收方Kafka的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      数据接收方Kafka在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      事件

      设置输出的事件。

      发送到外部

      是否发送到外部Kafka。默认不打开,表示输出到与AstroZero相连的Kafka。
      • Kafka集群地址:接收事件的kafka集群地址,多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

        获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”的值。例如,10.136.14.56:9092

        说明:

        接收事件的kafka集群不需要一定配置为和AstroZero相连。

      • 订阅主题:接收事件数据的kafka Topic。例如,__BINGO_PROD_SYS_TOPIC。
      • 是否开启SASL_SSL:当发送到外部Kafka时,才会显示该参数。表示是否开启SASL_SSL认证,开启后,访问外部Kafka需要提供用户名和密码进行认证,且数据会加密传输。

      配置

      Kafka集群地址

      输入Kafka集群的地址,不同IP之间以逗号分隔(如ip1:port1,ip2:port2,ip3:port3)。不允许填写平台的Kafka集群地址。

      主题

      主题是消息存储和发布的类名,通过主题您可以分类传输和处理事件。不允许填写平台内部的主题。

      安全协议

      设置为SASL_SSL时,访问外部Kafka需要提供用户名和密码进行认证,且数据会加密传输。

      kafka用户名

      Kafka进行Sasl认证的用户名。

      kafka密码

      Kafka进行Sasl认证的密码。

      事件属性

      作为分区Key

      是否将事件中的参数字段作为Kafka的消息头,用于事件分发时发送到不同的Kafka分区。

      属性/值

      配置传输的数据与事件中,已定义参数的对应关系。

    • 当接收方是ROMA Connect,配置如下。
      图13 数据接收方ROMA Connect配置页签
      表10 数据接收方ROMA Connect配置页签参数说明

      说明

      参数

      标签

      接收方的标签名,用于在页面显示。

      取值范围:1~64个字符。

      名称

      接收方在系统中的唯一标识,创建后不可修改。

      命名要求:必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

      APP ID

      MQS平台的应用ID,应用ID能够引用一个本平台定义的系统参数,引用位置不限制,例如“!roma_app_prefix}other_roma_app_id”

      密钥

      MQS平台应用请求密钥。

      主题

      MQS主题,主题能够引用一个本平台定义的系统参数,引用位置不限制,例如“{!roma_topic_prefix}other_roma_topic”

      标签

      设置订阅消息的标签,可以指定消费某一类型的消息。默认“*”表示消费所有类型的消息,可以写多个,多个之间用“||”隔开,例如“tag1 || tag2 || tag3”。

      加密传输

      数据是否加密传输。

  9. 单击数据接入页面上方的,设置数据接入标签和名称后,单击“保存”。

    • 标签:数据接入的标签名,用于在页面显示。
    • 名称:数据接入在系统中的唯一标识,创建后不可修改。

      命名要求:长度为1~59个字符,必须以英文字母开头,只能由英文字母、数字或单下划线组成,且不能以下划线结尾。

    • 描述:根据实际需求,输入数据接入的描述信息,长度不能超过255个字符。

  10. 单击数据接入页面上方的,启用数据接入。
  11. 启用成功后,单击,运行数据接入。

    Kafka接收到数据,表示数据接入设置成功。

    在数据接入页面上方,单击,可停止正在运行的数据接入。

  12. 在运行成功的提示框中,复制数据推送地址,并保存该地址。

    图14 复制数据推送地址

  13. 在第三方外部数据源侧,参考第三方文档,配置AstroZero提供的数据推送地址。
  14. (可选)调试或故障定位。

    在数据接入页面上方,单击,下载日志,查看日志进行定位。当数据源或输出源配置信息有误时,可通过下载日志进行定位。

  15. (可选)消息跟踪。

    在数据接入页面右侧,选择“消息跟踪”页签,单击“启动”,可启动消息跟踪。

    消息跟踪在一个周期(默认30分钟)后,会自动关闭。

    图15 启动消息跟踪

    启动消息跟踪后,可实时查看上报的报文处理情况。

    图16 查看报文处理情况

异常处理

当数据源类型选择ROMA Connect时,配置完数据接入后,MQS平台收不到消息,业务设置的EventTrigger没有拉起服务编排。

  • 问题现象:ROMA Connect消息轨迹是灰色的,可能是MQS服务地址配置错误或没有配置。

    解决方法:检查iotgateway容器的“roma-server”参数是否配置。若没有配置,请配置该参数或联系运维添加正确的“roma-server”地址(端口为“9776”),确认网络正常以及MQS服务器正常,兼容客户端版本。

  • 问题现象:ROMA Connect消息轨迹是红色的,可能是MQS的source定义错误。

    解决方法:在ROMA Connect平台,查看应用ID和对应的密钥是否一致,查看该应用是否订阅了主题。配置错误时,会导致鉴权失败。

  • 问题现象:ROMA Connect消息轨迹是绿色的, 表示MQS接收成功,并成功发送到Kafka,AstroZero也收到了对应的消息,但是可能没成功拉起服务编排。

    解决方法:查看iotgateway的“flume.log”日志,确认MQS是否接收成功。通过“kafka-console-cumer”确认消息是否转换成功,通过AstroZero的日志跟踪功能或AstroZero后台日志确认(日志关键字为“OnEvent”)平台是否收到事件。如果没有对应的服务编排或事件日志,可以确定服务编排未拉起,请联系华为工程师解决。

相关文档