更新时间:2025-12-10 GMT+08:00
分享

FlinkSql支持双流join过滤join产生的回撤

本章节仅适用于MRS 3.6.0-LTS及之后版本。

使用场景

配置Flink作业时,双流join可以过滤join产生的回撤,source算子的-D +U -U不会过滤。

使用限制

  • 只在双流join时使用。
  • 不推荐在join后有其他算子,join之后直接输出到sink。
  • 支持inner join、left join和right join。

使用方法

在join时添加hints参数 /*+ OPTIONS('allows.generated-retracts'='false')*/。

SQL示例:

CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
  'connector' = 'kafka',
  'topic' = 'user_info_001',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'value.format' = 'csv'
);
CREATE table print(
  `user_id` VARCHAR,
  `user_name` VARCHAR,
  `score` INT
) WITH ('connector' = 'print');
CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
  'connector' = 'kafka',
  'topic' = 'user_score_001',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'value.format' = 'csv'
);
INSERT INTO
  print
SELECT
  t.user_id,
  t.user_name,
  d.score
FROM
  user_info as t
  LEFT JOIN 
  /*+ OPTIONS('allows.generated-retracts'='false')*/ 
  user_score as d ON t.user_id = d.user_id;

数据示例:

user_info表先输入数据["1","zs"]

user_score表后输入数据["1",25]

print sink打印结果:
  • 不使用特性时

    +I["1","zs",null]

    -D["1","zs",null]

    +I["1","zs",25]

  • 使用特性时

    +I["1","zs",null]

    +I["1","zs",25]

相关文档