FlinkSql支持双流join过滤join产生的回撤
本章节仅适用于MRS 3.6.0-LTS及之后版本。
使用场景
配置Flink作业时,双流join可以过滤join产生的回撤,source算子的-D +U -U不会过滤。
使用限制
- 只在双流join时使用。
- 不推荐在join后有其他算子,join之后直接输出到sink。
- 支持inner join、left join和right join。
使用方法
在join时添加hints参数 /*+ OPTIONS('allows.generated-retracts'='false')*/。
SQL示例:
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
'connector' = 'kafka',
'topic' = 'user_info_001',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'csv'
);
CREATE table print(
`user_id` VARCHAR,
`user_name` VARCHAR,
`score` INT
) WITH ('connector' = 'print');
CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
'connector' = 'kafka',
'topic' = 'user_score_001',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'csv'
);
INSERT INTO
print
SELECT
t.user_id,
t.user_name,
d.score
FROM
user_info as t
LEFT JOIN
/*+ OPTIONS('allows.generated-retracts'='false')*/
user_score as d ON t.user_id = d.user_id;
数据示例:
user_info表先输入数据["1","zs"]
user_score表后输入数据["1",25]