更新时间:2022-04-26 GMT+08:00
分享

如何配置事件流

当需要对事件A进行订阅,分析处理后转换成事件B进行发送,这时您可通过配置“事件流”来实现这整个事件流程。目前该复杂场景只应用于:开启事件后,spark streaming从kafka中提取事件并按照定义规则对事件的数据流进行分析处理,然后发送新事件给kafka。为了节省资源,一个租户只能创建一个事件流。

背景信息

首先,您需要理解以下概念:

  • kafka:是由Apache开发的一个开源流处理平台,是分布式发布、订阅消息系统。它主要用于处理活跃的流数据。一般kafka部署为集群模式。
  • Topic:用来区分kafka集群中不同类型消息的主题。例如应用程序A订阅了Topic1,应用程序B订阅了Topic2而没有订阅Topic1,那么发送到Topic1中的数据将只能被应用程序A读到,而不会被应用程序B读到。
  • spark streaming:大规模流式数据处理工具,第三方工具。

场景描述

某仓储公司需要对该公司各个区域设置温度监测,当温度大于60度时,需要触发高温报警。

将温度监测和触发高温报警定义为两个事件,可通过配置“事件流”定义整个事件流程及事件数据筛选规则,开启流程后,spark streaming从kafka中提取温度监测事件,按照筛选规则分析处理后提取温度大于60度的数据,发送高温报警事件给kafka。

图1 流程示意图

前提条件

  • 您需要联系运维开启事件流License权限。开启后,才可新建、查看并管理事件流。
  • 已创建并启用温度监测事件“temperature”,该事件中自定义参数为文本类型“position”和数字类型“temperature”,分别表示位置和温度。具体操作步骤请参考如何自定义事件
  • 已创建并启用高温报警事件“fire_alarm”,该事件中自定义参数为文本类型“positionalarm”和数字类型“temperaturealarm”,分别表示高温报警位置和温度。具体操作步骤请参考如何自定义事件
  • 用户要执行或者停止运行事件流时,需要具有管理事件流权限。添加管理事件流的权限方法:在“管理 > 用户管理 > 权限配置”中设置,勾选用户对应的权限配置文件中系统权限“运行和停止数据接入与事件流任务”。

操作步骤

  1. 鼠标放在App下的Logic文件夹旁会出现加号,单击加号,选择“事件流”。

    您需要联系运维开启事件流License权限。开启后,才会有新建事件流的入口。

  2. 单击“新建”,进入“事件流”页面。
  3. 在“事件流”页面右侧单击“设置”,进行数据配置。

    图2 设置页面
    表1 参数说明

    参数名称

    参数说明

    窗口长度

    流处理的数据采集时间间隔,单位为分钟。建议时间间隔不要配置过小,否则系统频繁的建表和执行SQL语句,会影响系统性能。例如:配置为“1”表示spark streaming每隔1分钟去kafka获取事件数据。

  4. 在“事件流”页面右侧单击“图元”,拖拽“输入源”中Kafka图标至左侧画布区域,并配置基本信息,单击“保存”

    输入源中Kafka图标表示数据来自Kafka。
    图3 数据来源Kafka基本信息页面
    表2 数据来源Kafka基本信息页面参数说明

    参数名称

    参数说明

    标签

    该数据源Kafka的显示标签名。长度不超过64个字节。

    名称

    该数据源Kafka的名称,系统自动生成。

    事件

    需要订阅的事件。

    例如:配置为温度监测事件“temperature”。

    从外部接收

    是否从外部Kafka接收事件。

    若关闭该开关,表示从与AppCube相连的Kafka接收事件;若打开该开关,表示从外部Kafka接收事件,需要配置“Kafka集群地址”“订阅主题”

    • Kafka集群地址:数据源Kafka集群地址。

      多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

      获取Kafka的IP和端口号方法:可以通过登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”取值获取。例如:10.136.104.56:9092

    • 订阅主题:获取的事件数据来源,即来自于kafka集群的哪个Topic。

      例如:__BINGO_PROD_SYS_TOPIC

    该章节示例中关闭该开关,使用与AppCube相连的Kafka。

  5. 从“图元”中拖拽“投影”图标至左侧画布区域数据源Kafka下方,并在弹出的“添加投影器”页面配置基本信息。单击“保存”。

    图4 “添加投影器”页面
    表3 “添加投影器”页面参数说明

    参数名称

    参数说明

    标签

    该投影的显示标签名。长度不超过64个字节。

    名称

    该投影的名称,系统自动生成。

    输入字段

    订阅事件的所有事件参数。

    请从中选择需要的参数单击,选中的参数会出现在“输出字段”中。

    输出字段

    从订阅事件中的事件参数进行筛选后,用到的事件参数。

  6. 从“图元”中拖拽“过滤”图标至左侧画布区域投影下方,并在弹出的“添加过滤器”页面配置基本信息和过滤条件。单击“保存”。

    图5 设置过滤条件
    表4 过滤条件页面参数说明

    参数名称

    参数说明

    标签

    该Filter的显示标签名。长度不超过64个字节。

    名称

    该Filter的名称,系统自动生成。

    条件设置

    过滤条件。

    • 字段:上一步筛选后的事件参数。
    • 比较符:操作符,下拉选择。
    • 值:字段值。

    例如:上图中的配置表示提取温度大于60度的事件。

    单击“新增行”可以新增条件。

  7. (可选)从“图元”页面中拖拽“分组”图标至左侧画布区域过滤器下方,并在弹出的“添加分组器”页面配置基本信息和分组条件。单击“保存”。

    图6 “添加分组器”页面配置
    表5 分组条件页面参数说明

    参数名称

    参数说明

    标签

    该GroupBy的显示标签名。长度不超过64个字节。

    名称

    该GroupBy的名称,系统自动生成。

    分组设置

    分组条件。

    例如上图中配置表示按照相同位置的大于60度的温度取温度平均值。

  8. 在“图元”页面右侧拖拽输出源中的Kafka图标至左侧画布区域最下方,设置spark streaming输出事件和输出事件数据的接收方。单击“保存”。

    您可拖拽多个Sink中的Kafka图标,设置多个事件接收方。
    图7 数据接收方配置
    表6 数据接收方配置参数说明

    参数名称

    参数说明

    标签

    数据接收方Kafka的显示标签名。长度不超过64个字节。

    名称

    数据接收方Kafka的名称,系统自动生成。

    事件

    spark streaming输出的事件。并设置订阅事件和输出事件的参数对应关系。

    这里配置为高温报警事件“fire_alarm”。温度监测事件“temperature”的“position”对应高温报警事件“fire_alarm”中的“positionalarm”,温度监测事件的“temperature”对应高温报警事件“fire_alarm”中的“temperaturealarm”。

    发送到外部

    是否发送到外部Kafka。

    若关闭该开关,表示发送到与AppCube相连的Kafka;若打开该开关,表示发送到外部Kafka,需要配置“Kafka集群地址”“订阅主题”

    • Kafka集群地址:接收事件的Kafka集群地址。

      多个节点服务器地址可用“,”间隔,格式为“Kafka节点1ip:节点1端口号,Kafka节点2ip:节点2端口号,...”。

      获取Kafka的IP和端口号方法:可以通过登录每台Kafka节点服务器,查看“${KAFKA_HOME}/config”目录下“server.properties”中“listeners”取值获取。

      例如:10.136.14.56:9092

      说明:接收事件的kafka集群不需要一定配置为和AppCube相连。

    • 订阅主题:接收事件数据的kafka Topic。

      例如:__BINGO_PROD_SYS_TOPIC

    该章节示例中关闭该开关,使用与AppCube相连的Kafka。

    事件属性/属性、值

    配置输出事件与接收事件中已定义参数的对应关系。

    单击“新增”可添加多行。

  9. 单击页面右上方保存按钮。设置该“事件流”基本参数。

    图8 该“事件流”基本参数页面
    表7 “事件流”基本参数

    参数名称

    参数说明

    标签

    该“事件流”的显示标签名。长度不超过64个字节。

    名称

    该“事件流”的名称,系统自动生成。

    描述

    描述信息。

  10. 单击页面上方图标启用“事件流”。

结果验证

  1. 创建一个服务编排“fire_alarm”,创建后启用该服务编排,实现当kafka中出现高温报警事件时,AppCube需要接收该事件,再向第三方系统发送该告警邮件。

    1. 鼠标放在App下的Logic文件夹旁会出现加号,单击加号,选择“服务编排”。
    2. 系统弹出“添加服务编排”的对话框,选择“创建一个新的服务编排”,标签和名称填入“fire_alarm”,单击“添加”。
    3. 在服务编排编辑器页面左侧,拖拽“逻辑”下“等待”图标至画布中,松开鼠标左键。
    4. 单击,在“可编辑的事件”区域单击“新增”,新增“event0”分支,配置该分支事件为高温报警事件,当系统中出现该事件时,则会执行该分支。
      图9 配置等待图元
    5. 拖拽“基本”下“发送邮件”图标至画布中,松开鼠标左键。
    6. 单击,设置发送邮件参数。配置高温报警邮件的主题和内容,地址填入第三方系统的邮件接收地址。
      图10 配置发送邮件图元
      表8 邮件发送参数说明

      参数名称

      参数说明

      如何配置

      直接编辑

      手动设置邮件信息。

      选中单选按钮。

      “直接编辑”和“基于模板”两者择一。

      默认选中“直接编辑”。

      基于模板

      基于同一租户或基线里已有的邮件模板设置邮件主题和内容。

      方便用户增加效率。

      模板

      当选中“基于模板”时,您需要选择邮件模板。

      在下拉框里选择。

      主题

      邮件标题。

      • 当选中“直接编辑”时,您需要配置邮件标题。
      • 当选中“基于模板”时,该参数配置区域置灰。
      • 当选中“直接编辑”时,您需要直接输入邮件标题。
      • 当选中“基于模板”时,不用配置。

      内容

      邮件内容。

      • 当选中“直接编辑”时,您需要配置邮件内容。
      • 当选中“基于模板”时,该参数配置区域置灰。
      • 当选中“直接编辑”时,您需要直接输入邮件内容。
      • 当选中“基于模板”时,不用配置。

      自定义变量

      当选中“基于模板”时,您需要配置模板中的参数,在“值”中进行赋值。

      在“值”中直接输入参数取值或者从全局上下文拖拽变量。

      地址

      接收人的邮箱地址。

      • 字符串:表示直接输入分号分隔的字符串或者从全局上下文拖拽变量。
      • 集合:表示可以从全局上下文拖拽集合变量。

      抄送

      抄送人员的邮箱地址。

      直接输入分号分隔的字符串或者从全局上下文拖拽变量。

      密送

      秘密发送人员的邮箱地址。

      直接输入分号分隔的字符串或者从全局上下文拖拽变量。

    7. 按照下图连接所有图元,等待图元和发送邮件图元之间选择“event0”分支。
      图11 连接所有图元
    8. 单击,保存服务编排。
    9. 单击,启用服务编排。

  2. 创建一个服务编排“send_fire_event”,将温度监测事件“temperature”中位置参数“position”赋值为仓储公司某区域,温度“temperature”赋值为70度,启用该Flow,用于模拟事件发送。

    1. 鼠标放在App下的Logic文件夹旁会出现加号,单击加号,选择“服务编排”。
    2. 系统弹出“添加服务编排”的对话框,选择“创建一个新的服务编排”,标签和名称填入“send_fire_event”,单击“添加”。
    3. 在服务编排编辑器页面左侧,拖拽“基本”下“发送事件”图标至画布中,松开鼠标左键。
    4. 单击,设置发送事件参数。
      图12 配置发送事件图元
      表9 发送事件参数说明

      参数名称

      参数说明

      如何配置

      事件

      请选择待发送的事件。

      从下拉框选择“temperature__e”。

      分区字段

      可选配置。从事件中选择一个自定义参数用作Kafka的分区字段,则根据该字段值进行路由,相同的值将路由到同一个Kafka分区;如果不指定,则默认随机路由到不同Kafka分区,从而提升并发处理性能。

      从下拉框选择。

      该示例不用配置。

      发送到外部

      系统内部有配置与AppCube相连Kafka的固定Topic“__BINGO_SYS_TOPIC”。

      若不勾选“发送到外部”,表示将事件数据发到默认的Topic上;若勾选“发送到外部”,则表示将事件数据发送到与AppCube相连Kafka的其他Topic上,您需要配置其他的Topic。

      默认不勾选。

      去掉勾选。

      延迟到事务结束(提交或回滚)后才发送

      是否延迟到该服务编排事务结束后才发送事件。

      默认不勾选。表示流程执行到该图元时立即发送事件。

      勾选或者去掉勾选。

      主题

      当勾选“发送到外部”时,该参数才会显示,表示非系统配置的默认Topic。

      不用配置。

      目标/源

      配置事件数据。为该事件自定义参数赋值,将“源”取值赋值到“目标”中。

      请单击“新增行”添加并进行赋值。

      “目标”请从下拉框中选择事件的自定义参数“temperature”和“position”,在“源”中给目标赋值。

    5. 按照下图连接所有图元。
      图13 连接图元
    6. 单击,保存服务编排。
    7. 单击,启用服务编排。

  3. 在“事件流”列表页面该事件流程所在行单击,运行该事件流程。
  4. 单击服务编排“send_fire_event”编辑器上方的,执行服务编排。不用输入任何输入参数,单击“运行”。
  5. 参考上一步,执行服务编排“fire_alarm”。不用输入任何输入参数,单击“运行”。
  6. 打开第三方系统邮箱,成功收到高温警报邮件,验证成功。
分享:

    相关文档

    相关产品

close