更新时间:2026-06-29 GMT+08:00
分享

FlinkSQL Lookup算子复用

使用场景

在FlinkSQL作业中,当Lookup Join的结果需要写入多个sink端时,默认情况下每个sink端都会复制一份完整的Lookup Join算子,导致相同维表查询逻辑被重复执行浪费计算资源并降低作业执行效率。开启Lookup算子复用功能后Lookup Join的结果写入到多个sink端时,无需为每个sink复制一个Lookup join算子,提高作业的执行效率。

约束与限制

本章节仅适用于MRS 3.5.0-LTS及之后版本。

使用方法

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.graph-merge-enabled”为“true”开启Lookup算子复用功能,可参考集群连接模式创建Flink SQL作业

SQL示例:
create table hudimor (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp
) with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs:///tmp/hudimor',
  'lookup.cache' = 'ALL',
  'lookup.cache.ttl' = '60000',
  'lookup.cache.partitioned' = 'true',
  'lookup.parallelism' = '3'
);
CREATE TABLE datagen1 (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(6),
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5'
);
create view view1 as
select
  t1.uuid as uuid,
  t1.name as name,
  t1.age as age,
  t1.ts as ts
FROM
  datagen1 AS t1
  left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;
CREATE TABLE blackhole1 (uuid varchar(20), name varchar(10)) WITH ('connector' = 'blackhole');
CREATE TABLE blackhole2 (uuid varchar(20), age int) WITH ('connector' = 'blackhole');
insert into
  blackhole1
select
  uuid,
  name
from
  view1;
insert into
  blackhole2
select
  uuid,
  age
from
  view1;

相关文档