更新时间:2024-07-11 GMT+08:00
分享

如何配置事件流

使用说明

当需要对事件A进行订阅,分析处理后转换成事件B进行发送时,可以通过配置“事件流”,来实现这整个事件流程。目前该复杂场景只应用于开启事件后,spark streaming从kafka中,提取事件并按照定义规则对事件的数据流进行分析处理,然后发送新事件给kafka。为了节省资源,一个租户只能创建一个事件流。

背景信息

首先,您需要理解以下概念:

  • kafka:是由Apache开发的一个开源流处理平台,是分布式发布、订阅消息系统。kafka主要用于处理活跃的流数据,一般kafka部署为集群模式。
  • Topic:用来区分kafka集群中,不同类型消息的主题。例如,应用程序A订阅了Topic1,应用程序B订阅了Topic2而没有订阅Topic1,那么发送到Topic1中的数据将只能被应用程序A读到,而不会被应用程序B读到。
  • spark streaming:大规模流式数据处理工具,第三方工具。

场景描述

某仓储公司需要对该公司各个区域设置温度监测,当温度大于60度时,需要触发高温报警。

将温度监测和触发高温报警定义为两个事件,可通过配置“事件流”定义整个事件流程及事件数据筛选规则,开启流程后,spark streaming从kafka中提取温度监测事件,按照筛选规则分析处理后提取温度大于60度的数据,发送高温报警事件给kafka。

图1 流程示意图

前提条件

  • 请联系运维,开启事件流License权限。开启后,才可新建、查看并管理事件流。
  • 已创建并启用温度监测事件“temperature”,该事件中自定义参数为文本类型“position”和数字类型“temperature”,分别表示位置和温度,具体操作步骤请参考如何自定义事件
  • 已创建并启用高温报警事件“fire_alarm”,该事件中自定义参数为文本类型“positionalarm”和数字类型“temperaturealarm”,分别表示高温报警位置和温度,具体操作步骤请参考如何自定义事件

操作步骤

  1. 参考登录经典应用设计器中操作,登录经典版应用设计器。
  2. 将鼠标放在应用的某个文件夹上(如Logic),单击“+”,选择“事件流”。

    开启事件流License权限后,才会有新建事件流的入口。

  3. 单击“新建”,进入“事件流”页面。
  4. 在“事件流”页面右侧,单击“设置”,进行数据配置。

    图2 设置页面

    窗口长度:事件流处理的数据采集时间间隔,单位为分钟。建议时间间隔不要配置过小,否则系统频繁的建表和执行SQL语句,会影响系统性能。例如:配置为“1”表示spark streaming每隔1分钟去kafka获取事件数据。

  5. 在“事件流”页面右侧,单击“图元”,拖拽“输入源”中Kafka图元至左侧画布区域,并配置基本信息,单击“保存”

    输入源中Kafka图标表示数据来自Kafka。
    图3 数据来源Kafka基本信息页面
    • 标签:新建数据源Kafka的标签名,用于在界面展示。
    • 名称:新建数据源Kafka的名称,系统自动生成。
    • 事件:需要订阅的事件。例如,配置为温度监测事件“temperature”。
    • 从外部接收:是否从外部Kafka接收事件。
      • 关闭该开关,从与AstroZero相连的Kafka接收事件。
      • 打开该开关,从外部Kafka接收事件。本示例中关闭该开关,使用与AstroZero相连的Kafka。
        • Kafka集群地址:数据源Kafka的集群地址。多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

          获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下,“server.properties”中“listeners”的取值。

        • 订阅主题:获取的事件数据来源,即来自于kafka集群的哪个Topic。

  6. 从“图元”中,拖拽“投影”图元至左侧画布区域数据源Kafka下方,并在弹出的“添加投影器”页面配置基本信息,单击“保存”。

    图4 “添加投影器”页面
    • 标签:新建投影的标签名,用于在界面展示。
    • 名称:新建投影的名称,系统自动生成。
    • 输入字段:订阅事件的所有事件参数。选择所需的参数,单击,选中的参数会出现在“输出字段”中。
    • 输出字段:从订阅事件的事件参数进行筛选后,用到的事件参数。

  7. 从“图元”中,拖拽“过滤”图元至左侧画布区域投影下方,并在弹出的“添加过滤器”页面配置基本信息和过滤条件,单击“保存”。

    图5 设置过滤条件
    • 标签:新建过滤的标签名,用于在界面展示。
    • 名称:新建过滤的名称,系统自动生成。
    • 条件设置:设置过滤条件。例如,图中的配置表示提取温度大于60度的事件。
      • 字段:上一步筛选后的事件参数。
      • 比较符:操作符,直接在下拉框中选择。
      • 值:字段值。

  8. (可选)从“图元”页面中,拖拽“分组”图元至左侧画布区域过滤器下方,并在弹出的“添加分组器”页面配置基本信息和分组条件,单击“保存”。

    图6 “添加分组器”页面配置
    • 标签:新建分组的标签名,用于在界面展示。
    • 名称:新建分组的名称,系统自动生成。
    • 分组设置:设置分组条件。例如,图中配置表示按照相同位置的大于60度的温度取温度平均值。

  9. 在“图元”页面右侧,拖拽输出源中的Kafka图元至左侧画布区域最下方,设置spark streaming输出事件和输出事件数据的接收方,单击“保存”。

    可以拖拽多个Sink中的Kafka图元,设置多个事件接收方。
    图7 数据接收方配置
    • 标签:数据接收方Kafka的标签名,用于在界面展示。
    • 名称:数据接收方Kafka的名称,系统自动生成。
    • 事件:spark streaming输出的事件。本示例此处配置为高温报警事件“fire_alarm”。温度监测事件“temperature”中的“position”,对应高温报警事件“fire_alarm”中的“positionalarm”。温度监测事件的“temperature”,对应高温报警事件“fire_alarm”中的“temperaturealarm”。
    • 发送到外部:是否发送到外部Kafka。若关闭该开关,表示发送到与AstroZero相连的Kafka。若打开该开关,表示发送到外部Kafka,需要配置“Kafka集群地址”“订阅主题”。本示例中关闭该开关,使用与AstroZero相连的Kafka。
      • Kafka集群地址:接收事件的Kafka集群地址,例如10.136.14.56:9092。

        多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

        获取Kafka的IP和端口号方法:登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下,“server.properties”中“listeners”的取值。

        接收事件的kafka集群,不需要一定配置为和AstroZero相连。

      • 订阅主题:接收事件数据的kafka Topic,例如__BINGO_PROD_SYS_TOPIC。
    • 事件属性/属性、值:配置输出事件与接收事件中,已定义参数的对应关系。

  10. 单击页面右上方保存按钮,设置“事件流”基本参数。

    图8 该“事件流”基本参数页面
    • 标签:新建事件流的标签名,用于在界面展示。
    • 名称:新建事件流的名称,系统自动生成。
    • 描述:新建事件流的描述信息。

  11. 单击页面上方的,启用事件流。

结果验证

  1. 创建一个服务编排“fire_alarm”,创建后启用该服务编排,实现当kafka中出现高温报警事件时,AstroZero需要接收该事件,再向第三方系统发送该告警邮件。

    1. 鼠标放在应用下的Logic文件夹上,单击“+”,选择“服务编排”。
    2. 选择“创建一个新的服务编排”,设置标签和名称为“fire_alarm”,单击“添加”。
    3. 在服务编排设计页面左侧,拖拽“逻辑”下的“等待”图元至画布中。
    4. 单击,在“可编辑的事件”中,单击“新增”,新增“event0”分支。
      配置该分支事件为高温报警事件,当系统中出现该事件时,会执行该分支。
      图9 配置等待图元
    5. 拖拽“基本”下的“发送邮件”图元至画布中。
    6. 单击,设置发送邮件参数。
      配置高温报警邮件的主题和内容,地址填入第三方系统的邮件接收地址。
      图10 配置发送邮件图元
      • 直接编辑:手动设置邮件信息。
      • 基于模板:基于同一账号或基线中已有的邮件模板,设置邮件主题和内容,方便用户增加效率。
      • 模板:选择“基于模板”时,需要选择邮件模板。
      • 主题:设置邮件标题。
        • 选中“直接编辑”时,需要配置邮件标题。
        • 选中“基于模板”时,该参数配置区域置灰。
      • 内容:设置邮件内容。
        • 选中“直接编辑”时,需要配置邮件内容。
        • 选中“基于模板”时,该参数配置区域置灰。
      • 自定义变量:选中“基于模板”时,需要配置模板中的参数。在“值”中,直接输入参数取值或者从全局上下文拖拽变量。
      • 地址:接收人的邮箱地址。
        • 字符串:直接输入分号分隔的字符串或从全局上下文拖拽变量。
        • 集合:从全局上下文拖拽集合变量。
      • 抄送:抄送人员的邮箱地址,直接输入分号分隔的字符串或者从全局上下文拖拽变量。
      • 密送:密送发送人员的邮箱地址,直接输入分号分隔的字符串或者从全局上下文拖拽变量。
    7. 按照下图连接所有图元,等待图元和发送邮件图元之间选择“event0”分支。
      图11 连接所有图元
    8. 单击页面上方的,保存服务编排。
    9. 保存成功后,单击,启用服务编排。

  2. 创建一个服务编排“send_fire_event”,将温度监测事件“temperature”中位置参数“position”赋值为仓储公司某区域,温度“temperature”赋值为70度,启用该Flow,用于模拟事件发送。

    1. 鼠标放在应用下的Logic文件夹上,单击“+”,选择“服务编排”。
    2. 选择“创建一个新的服务编排”,设置标签和名称为“send_fire_event”,单击“添加”。
    3. 在服务编排编辑器页面左侧,拖拽“基本”下的“发送事件”图元至画布中。
    4. 单击,设置发送事件参数。
      图12 配置发送事件图元
      • 事件:选择待发送的事件。
      • 分区字段:从事件中选择一个自定义参数用作Kafka的分区字段,则根据该字段值进行路由,相同的值将路由到同一个Kafka分区。如果不指定,则默认随机路由到不同Kafka分区,从而提升并发处理性能。
      • 发送到外部:系统内部有配置与AstroZero相连Kafka的固定Topic“__BINGO_SYS_TOPIC”。若不勾选“发送到外部”,表示将事件数据发到默认的Topic上。若勾选“发送到外部”,则表示将事件数据发送到与AstroZero相连Kafka的其他Topic上。
      • 延迟到事务结束(提交或回滚)后才发送:是否延迟到该服务编排事务结束后,才发送事件。
      • 主题:勾选“发送到外部”时,才显示该参数,表示非系统配置的默认Topic。
      • 目标/源:配置事件数据。

        本示例中,“目标”请从下拉框中选择事件的自定义参数“temperature”和“position”,在“源”中给目标赋值。

    5. 按照下图连接所有图元。
      图13 连接图元
    6. 单击页面上方的,保存服务编排。
    7. 保存成功后,单击,启用服务编排。

  3. 在“事件流”列表页面,单击该事件流程所在行的,运行该事件流程。
  4. 单击服务编排“send_fire_event”编辑器上方的,执行服务编排。

    不用输入任何输入参数,直接单击“运行”。

  5. 参考上一步,执行服务编排“fire_alarm”。

    不用输入任何输入参数,单击“运行”。

  6. 打开第三方系统邮箱,成功收到高温警报邮件,验证成功。

相关文档