Flink CEP en SQL
Flink CEP en SQL
Flink permite a los usuarios representar resultados de consultas de procesamiento de eventos complejos (CEP) en SQL para la coincidencia de patrones y evaluar secuencias de eventos en motores de Flink.
Sintaxis de consulta SQL
CEP SQL se implementa a través de la sintaxis de SQL MATCH_RECOGNIZE. La cláusula MATCH_RECOGNIZE es compatible con Oracle SQL desde Oracle Database 12c y se utiliza para indicar la coincidencia de patrones de eventos en SQL. Apache Calcite también soporta la cláusula MATCH_RECOGNIZE.
Flink utiliza Calcite para analizar los resultados de las consultas SQL. Por lo tanto, esta operación cumple con la sintaxis de Apache Calcite.
MATCH_RECOGNIZE ( [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH | ALL ROWS PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN ( pattern ) [ WITHIN intervalLiteral ] [ SUBSET subsetItem [, subsetItem ]* ] DEFINE variable AS condition [, variable AS condition ]* )
Los elementos sintácticos de la cláusula MATCH_RECOGNIZE se definen de la siguiente manera:
-PARTITION BY (Opcional): define columnas de partición. Esta cláusula es opcional. Si este parámetro no está definido, se usa el paralelismo 1.
-ORDER BY (Opcional): define la secuencia de eventos en un flujo de datos. La cláusula ORDER BY es opcional. Si se omite, se utiliza la ordenación no determinista. Dado que el orden de los eventos es importante en la coincidencia de patrones, esta cláusula debe especificarse en la mayoría de los casos.
-MEASURES (Opcional): especifica el valor de atributo del evento correctamente coincidente.
-ONE ROW PER MATCH | ALL ROWS PER MATCH (Opcional): define cómo generar el resultado. ONE ROW PER MATCH indica que solo se genera una fila para cada coincidencia. ALL ROWS PER MATCH indica que se genera una fila para cada evento coincidente.
-AFTER MATCH (Opcional): especifica la posición de inicio para el procesamiento después de que el siguiente patrón se coincida correctamente.
-PATTERN: define el patrón coincidente como una expresión regular. Los siguientes operadores se pueden utilizar en la cláusula PATTERN: operadores de unión, operadores cuantificadores (*, +, ?, {n}, {n,}, {n,m} y {,m}), operadores de rama (barra vertical |) y operadores diferenciales ('{- -}').
(Opcional): -WITHIN: genera una coincidencia de cláusula de patrón solo cuando la coincidencia se produce dentro del tiempo especificado.
-SUBSET (Opcional): combina una o más variables asociadas definidas en la cláusula DEFINE.
-DEFINE: especifica la condición de Boolean, que define las variables utilizadas en la cláusula PATTERN.
Además, la cláusula MATCH_RECOGNIZE admite las siguientes funciones:
-MATCH_NUMBER(): se utiliza en la cláusula MEASURES para asignar el mismo número a cada fila que coincida correctamente.
-CLASSIFIER(): se utiliza en la cláusula MEASURES para indicar la asignación entre filas y variables coincidentes.
-FIRST() y LAST(): se utiliza en la cláusula MEASURES para devolver el valor de la expresión evaluada en la primera o la última fila del conjunto de filas asignado a la variable de esquema.
-NEXT() y PREV(): se utilizan en la cláusula DEFINE para evaluar una expresión utilizando la fila anterior o siguiente de una partición.
Palabras clave -RUNNING y FINAL: se utilizan para determinar la semántica necesaria para la agregación. RUNNING se puede usar en las cláusulas MEASURES y DEFINE mientras que FINAL solo se puede usar en la cláusula MEASURES.
- Funciones agregadas (COUNT, SUM, AVG, MAX, MIN): se utiliza en las cláusulas MEASURES y DEFINE.
Ejemplo de consulta
La siguiente consulta encuentra el patrón en forma de V en el flujo de datos de precios de acciones.
SELECT * FROM MyTable MATCH_RECOGNIZE ( ORDER BY rowtime MEASURES STRT.name as s_name, LAST(DOWN.name) as down_name, LAST(UP.name) as up_name ONE ROW PER MATCH PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.v < PREV(DOWN.v), UP AS UP.v > PREV(UP.v) )
En la siguiente consulta, la función agregada AVG se utiliza en la cláusula MEASURES de SUBSET E que consiste en variables relacionadas con A y C.
SELECT * FROM Ticker MATCH_RECOGNIZE ( MEASURES AVG(E.price) AS avgPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B+ C) SUBSET E = (A,C) DEFINE A AS A.price < 30, B AS B.price < 20, C AS C.price < 30 )