Flink CEP em SQL
Flink CEP em SQL
O Flink permite que os usuários representem resultados de consultas de processamento de eventos complexos (CEP) em SQL para correspondência de padrões e avaliem fluxos de eventos nos mecanismos do Flink.
Sintaxe da consulta SQL
CEP SQL é implementado através da sintaxe SQL MATCH_RECOGNIZE. A cláusula MATCH_RECOGNIZE é suportada pelo Oracle SQL desde o Oracle Database 12c e é usada para indicar a correspondência de padrões de eventos em SQL. O Apache Calcite também suporta a cláusula MATCH_RECOGNIZE.
O Flink usa Calcite para analisar os resultados da consulta SQL. Portanto, esta operação está em conformidade com a sintaxe Apache Calcite.
MATCH_RECOGNIZE ( [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH | ALL ROWS PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN ( pattern ) [ WITHIN intervalLiteral ] [ SUBSET subsetItem [, subsetItem ]* ] DEFINE variable AS condition [, variable AS condition ]* )
Os elementos de sintaxe da cláusula MATCH_RECOGNIZE são definidos da seguinte forma:
(Opcional) -PARTITION BY: define colunas de partição. Esta cláusula é facultativa. Se este parâmetro não for definido, o paralelismo 1 é usado.
(Opcional) -ORDER BY: define a sequência de eventos em um fluxo de dados. A cláusula ORDER BY é opcional. Se for ignorado, a ordenação não determinística é usada. Como a ordem dos eventos é importante na correspondência de padrões, essa cláusula deve ser especificada na maioria dos casos.
(Opcional) -MEASURES: especifica o valor do atributo do evento correspondido com sucesso.
(Opcional) -ONE ROW PER MATCH | ALL ROWS PER MATCH: define como produzir o resultado. ONE ROW PER MATCH indica que apenas uma linha é saída para cada correspondência. ALL ROWS PER MATCH indica que uma linha é saída para cada evento correspondente.
(Opcional) -AFTER MATCH: especifica a posição inicial para processamento após o próximo padrão ser correspondido com sucesso.
-PATTERN: define o padrão de correspondência como uma expressão regular. Os seguintes operadores podem ser usados na cláusula PATTERN: operadores de junção, operadores quantificadores (*, +, ?, {n}, {n,}, {n,m} e {,m}), operadores de ramificação (barra vertical |) e operadores diferenciais ('{- -}').
(Opcional) -WITHIN: produz uma correspondência de cláusula padrão somente quando a correspondência ocorre dentro do tempo especificado.
(Opcional) -SUBSET: combina uma ou mais variáveis associadas definidas na cláusula DEFINE.
-DEFINE: especifica a condição booleana, que define as variáveis usadas na cláusula PATTERN.
Além disso, a cláusula MATCH_RECOGNIZE suporta as seguintes funções:
-MATCH_NUMBER(): usado na cláusula MEASURES para alocar o mesmo número para cada linha que é correspondida com êxito.
-CLASSIFIER(): usado na cláusula MEASURES para indicar o mapeamento entre linhas e variáveis correspondentes.
-FIRST() e LAST(): usadas na cláusula MEASURES para retornar o valor da expressão avaliada na primeira ou na última linha do conjunto de linhas mapeado para a variável de esquema.
-NEXT() e PREV(): usadas na cláusula DEFINE para avaliar uma expressão usando a linha anterior ou a próxima em uma partição.
Palavras-chave -RUNNING e FINAL: usadas para determinar a semântica necessária para a agregação. RUNNING pode ser usado nas cláusulas MEASURES e DEFINE, enquanto FINAL pode ser usado apenas na cláusula MEASURES.
- Funções agregadas (COUNT, SUM, AVG, MAX, MIN): usadas nas cláusulas MEASURES e DEFINE.
Exemplo de consulta
A consulta a seguir encontra o padrão em forma de V no fluxo de dados do preço das ações.
SELECT * FROM MyTable MATCH_RECOGNIZE ( ORDER BY rowtime MEASURES STRT.name as s_name, LAST(DOWN.name) as down_name, LAST(UP.name) as up_name ONE ROW PER MATCH PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.v < PREV(DOWN.v), UP AS UP.v > PREV(UP.v) )
Na consulta a seguir, a função agregada AVG é usada na cláusula MEASURES do SUBSET E que consiste em variáveis relacionadas a A e C.
SELECT * FROM Ticker MATCH_RECOGNIZE ( MEASURES AVG(E.price) AS avgPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B+ C) SUBSET E = (A,C) DEFINE A AS A.price < 30, B AS B.price < 20, C AS C.price < 30 )