FlinkSQL Lookup算子复用
本章节适用于MRS 3.5.0及以后版本。
使用场景
将Lookup Join的结果写入到多个sink端时,无需为每个sink复制一个Lookup join算子,提高作业的执行效率。
使用方法
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.graph-merge-enabled”为“true”开启Lookup算子复用功能,可参考创建FlinkServer作业。
create table hudimor (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp
) with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs:///tmp/hudimor',
'lookup.cache' = 'ALL',
'lookup.cache.ttl' = '60000',
'lookup.cache.partitioned' = 'true',
'lookup.parallelism' = '3'
);
CREATE TABLE datagen1 (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(6),
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
create view view1 as
select
t1.uuid as uuid,
t1.name as name,
t1.age as age,
t1.ts as ts
FROM
datagen1 AS t1
left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;
CREATE TABLE blackhole1 (uuid varchar(20), name varchar(10)) WITH ('connector' = 'blackhole');
CREATE TABLE blackhole2 (uuid varchar(20), age int) WITH ('connector' = 'blackhole');
insert into
blackhole1
select
uuid,
name
from
view1;
insert into
blackhole2
select
uuid,
age
from
view1;