更新时间:2024-11-26 GMT+08:00
分享

FlinkSQL Lookup算子复用

本章节适用于MRS 3.5.0及以后版本。

使用场景

将Lookup Join的结果写入到多个sink端时,无需为每个sink复制一个Lookup join算子,提高作业的执行效率。

使用方法

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.graph-merge-enabled”为“true”开启Lookup算子复用功能,可参考如何创建FlinkServer作业

SQL示例:
create table hudimor (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp
) with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs:///tmp/hudimor',
  'lookup.cache' = 'ALL',
  'lookup.cache.ttl' = '60000',
  'lookup.cache.partitioned' = 'true',
  'lookup.parallelism' = '3'
);
CREATE TABLE datagen1 (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(6),
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5'
);
create view view1 as
select
  t1.uuid as uuid,
  t1.name as name,
  t1.age as age,
  t1.ts as ts
FROM
  datagen1 AS t1
  left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;
CREATE TABLE blackhole1 (uuid varchar(20), name varchar(10)) WITH ('connector' = 'blackhole');
CREATE TABLE blackhole2 (uuid varchar(20), age int) WITH ('connector' = 'blackhole');
insert into
  blackhole1
select
  uuid,
  name
from
  view1;
insert into
  blackhole2
select
  uuid,
  age
from
  view1;

相关文档