Updated on 2025-08-22 GMT+08:00

Reusing the Flink SQL Lookup Operator

This section applies only to MRS 3.5.0 or later.

Scenarios

When the Lookup Join result is written to multiple sinks, you do not need to copy a Lookup Join operator for each sink. This improves job performance.

How to Use

When configuring a Flink job, add custom parameter table.optimizer.graph-merge-enabled and set it to true on the Flink job development page of the FlinkServer web UI to enable the Lookup operator reuse function. For details, see Creating a Job.

Example SQL statements:
create table hudimor (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp
) with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'path' = 'hdfs:///tmp/hudimor',
  'lookup.cache' = 'ALL',
  'lookup.cache.ttl' = '60000',
  'lookup.cache.partitioned' = 'true',
  'lookup.parallelism' = '3'
);
CREATE TABLE datagen1 (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(6),
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5'
);
create view view1 as
select
  t1.uuid as uuid,
  t1.name as name,
  t1.age as age,
  t1.ts as ts
FROM
  datagen1 AS t1
  left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;
CREATE TABLE blackhole1 (uuid varchar(20), name varchar(10)) WITH ('connector' = 'blackhole');
CREATE TABLE blackhole2 (uuid varchar(20), age int) WITH ('connector' = 'blackhole');
insert into
  blackhole1
select
  uuid,
  name
from
  view1;
insert into
  blackhole2
select
  uuid,
  age
from
  view1;