FlinkSQL Lookup算子复用
本章节适用于MRS 3.5.0及以后版本。
使用场景
将Lookup Join的结果写入到多个sink端时,无需为每个sink复制一个Lookup join算子,提高作业的执行效率。
使用方法
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.graph-merge-enabled”为“true”开启Lookup算子复用功能,可参考如何创建FlinkServer作业。
create table hudimor ( uuid varchar(20), name varchar(10), age int, ts timestamp ) with ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 'hdfs:///tmp/hudimor', 'lookup.cache' = 'ALL', 'lookup.cache.ttl' = '60000', 'lookup.cache.partitioned' = 'true', 'lookup.parallelism' = '3' ); CREATE TABLE datagen1 ( uuid varchar(20), name varchar(10), age int, ts timestamp(6), proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); create view view1 as select t1.uuid as uuid, t1.name as name, t1.age as age, t1.ts as ts FROM datagen1 AS t1 left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid; CREATE TABLE blackhole1 (uuid varchar(20), name varchar(10)) WITH ('connector' = 'blackhole'); CREATE TABLE blackhole2 (uuid varchar(20), age int) WITH ('connector' = 'blackhole'); insert into blackhole1 select uuid, name from view1; insert into blackhole2 select uuid, age from view1;