Flink CEP in SQL
SQL中的Flink CEP
CloudStream扩展为允许用户在SQL中表示CEP查询结果以用于模式匹配,并在Flink引擎上对事件流进行评估。
SQL查询语法
通过MATCH_RECOGNIZE的SQL语法实现。MATCH_RECOGNIZE子句自Oracle Database 12c起由Oracle SQL支持,用于在SQL中表示事件模式匹配。Apache Calcite同样支持MATCH_RECOGNIZE子句。
由于Flink通过Calcite分析SQL查询结果,本操作遵循Apache Calcite语法。
MATCH_RECOGNIZE ( [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH | ALL ROWS PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN ( pattern ) [ WITHIN intervalLiteral ] [ SUBSET subsetItem [, subsetItem ]* ] DEFINE variable AS condition [, variable AS condition ]* )
MATCH_RECOGNIZE子句的语法元素定义如下:
-PARTITION BY [可选]:定义分区列。该子句为可选子句。如果未定义,则使用并行度1。
-ORDER BY [可选]:定义数据流中事件的顺序。ORDER BY子句为可选子句,如果忽略则使用非确定性排序。由于事件顺序在模式匹配中很重要,因此大多数情况下应指定该子句。
-MEASURES [可选]:指定匹配成功的事件的属性值。
-ONE ROW PER MATCH | ALL ROWS PER MATCH [可选]:定义如何输出结果。ONE ROW PER MATCH表示每次匹配只输出一行,ALL ROWS PER MATCH表示每次匹配的每一个事件输出一行。
-AFTER MATCH [可选]:指定从何处开始对下一个模式匹配进行匹配成功后的处理。
-PATTERN: 将匹配模式定义为正则表达式格式。PATTERN子句中可使用以下运算符: 连接运算符,量词运算符(*,+,?,{n},{n,}, {n,m}, {,m}),分支运算符(使用竖线‘|’),以及异运算符(‘{- -}’)。
-WITHIN [可选]:当且仅当匹配发生在指定时间内,则输出模式子句匹配。
-SUBSET [可选]:将DEFINE子句中定义的一个或多个关联变量组合在一起。
-DEFINE:指定boolean条件,该条件定义了PATTERN子句中使用的变量。
此外,还支持以下函数:
-MATCH_NUMBER():可用于MEASURES子句中,为同一成功匹配的每一行分配相同编号。
-CLASSIFIER():可用于MEASURES子句中,以指示匹配的行与变量之间的映射关系。
-FIRST()和LAST():可用于MEASURES子句中,返回在映射到模式变量的行集的第一行或最后一行中评估的表达式的值。
-NEXT()和PREV():可用于DEFINE子句中,通过分区中的前一行或下一行来评估表达式。
-RUNNING和FINAL关键字:可用于确定聚合的所需语义。RUNNING可用于MEASURES和DEFINE子句中,而FINAL只能用于MEASURES子句中。
-聚合函数(COUNT,SUM,AVG,MAX,MIN):这些聚合函数可用于MEASURES子句和DEFINE子句中。
查询示例
以下查询发现股票价格数据流中的V型模式。
SELECT * FROM MyTable MATCH_RECOGNIZE ( ORDER BY rowtime MEASURES STRT.name as s_name, LAST(DOWN.name) as down_name, LAST(UP.name) as up_name ONE ROW PER MATCH PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.v < PREV(DOWN.v), UP AS UP.v > PREV(UP.v) )
在以下查询中,聚合函数AVG应用于A和C相关变量组成的SUBSET E的MEASURES子句中。
SELECT * FROM Ticker MATCH_RECOGNIZE ( MEASURES AVG(E.price) AS avgPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B+ C) SUBSET E = (A,C) DEFINE A AS A.price < 30, B AS B.price < 20, C AS C.price < 30 )