Flink CEP em SQL
Flink CEP em SQL
O Flink permite que os usuários representem resultados de consultas de processamento de eventos complexos (CEP) em SQL para correspondência de padrões e avaliem fluxos de eventos nos mecanismos do Flink.
Sintaxe da consulta SQL
CEP SQL é implementado através da sintaxe SQL MATCH_RECOGNIZE. A cláusula MATCH_RECOGNIZE é suportada pelo Oracle SQL desde o Oracle Database 12c e é usada para indicar a correspondência de padrões de eventos em SQL. O Apache Calcite também suporta a cláusula MATCH_RECOGNIZE.
O Flink usa Calcite para analisar os resultados da consulta SQL. Portanto, esta operação está em conformidade com a sintaxe Apache Calcite.
MATCH_RECOGNIZE (
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH | ALL ROWS PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN ( pattern )
[ WITHIN intervalLiteral ]
[ SUBSET subsetItem [, subsetItem ]* ]
DEFINE variable AS condition [, variable AS condition ]*
)
Os elementos de sintaxe da cláusula MATCH_RECOGNIZE são definidos da seguinte forma:
(Opcional) -PARTITION BY: define colunas de partição. Esta cláusula é facultativa. Se este parâmetro não for definido, o paralelismo 1 é usado.
(Opcional) -ORDER BY: define a sequência de eventos em um fluxo de dados. A cláusula ORDER BY é opcional. Se for ignorado, a ordenação não determinística é usada. Como a ordem dos eventos é importante na correspondência de padrões, essa cláusula deve ser especificada na maioria dos casos.
(Opcional) -MEASURES: especifica o valor do atributo do evento correspondido com sucesso.
(Opcional) -ONE ROW PER MATCH | ALL ROWS PER MATCH: define como produzir o resultado. ONE ROW PER MATCH indica que apenas uma linha é saída para cada correspondência. ALL ROWS PER MATCH indica que uma linha é saída para cada evento correspondente.
(Opcional) -AFTER MATCH: especifica a posição inicial para processamento após o próximo padrão ser correspondido com sucesso.
-PATTERN: define o padrão de correspondência como uma expressão regular. Os seguintes operadores podem ser usados na cláusula PATTERN: operadores de junção, operadores quantificadores (*, +, ?, {n}, {n,}, {n,m} e {,m}), operadores de ramificação (barra vertical |) e operadores diferenciais ('{- -}').
(Opcional) -WITHIN: produz uma correspondência de cláusula padrão somente quando a correspondência ocorre dentro do tempo especificado.
(Opcional) -SUBSET: combina uma ou mais variáveis associadas definidas na cláusula DEFINE.
-DEFINE: especifica a condição booleana, que define as variáveis usadas na cláusula PATTERN.
Além disso, a cláusula MATCH_RECOGNIZE suporta as seguintes funções:
-MATCH_NUMBER(): usado na cláusula MEASURES para alocar o mesmo número para cada linha que é correspondida com êxito.
-CLASSIFIER(): usado na cláusula MEASURES para indicar o mapeamento entre linhas e variáveis correspondentes.
-FIRST() e LAST(): usadas na cláusula MEASURES para retornar o valor da expressão avaliada na primeira ou na última linha do conjunto de linhas mapeado para a variável de esquema.
-NEXT() e PREV(): usadas na cláusula DEFINE para avaliar uma expressão usando a linha anterior ou a próxima em uma partição.
Palavras-chave -RUNNING e FINAL: usadas para determinar a semântica necessária para a agregação. RUNNING pode ser usado nas cláusulas MEASURES e DEFINE, enquanto FINAL pode ser usado apenas na cláusula MEASURES.
- Funções agregadas (COUNT, SUM, AVG, MAX, MIN): usadas nas cláusulas MEASURES e DEFINE.
Exemplo de consulta
A consulta a seguir encontra o padrão em forma de V no fluxo de dados do preço das ações.
SELECT *
FROM MyTable
MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
STRT.name as s_name,
LAST(DOWN.name) as down_name,
LAST(UP.name) as up_name
ONE ROW PER MATCH
PATTERN (STRT DOWN+ UP+)
DEFINE
DOWN AS DOWN.v < PREV(DOWN.v),
UP AS UP.v > PREV(UP.v)
)
Na consulta a seguir, a função agregada AVG é usada na cláusula MEASURES do SUBSET E que consiste em variáveis relacionadas a A e C.
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
MEASURES
AVG(E.price) AS avgPrice
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B+ C)
SUBSET E = (A,C)
DEFINE
A AS A.price < 30,
B AS B.price < 20,
C AS C.price < 30
)