更新时间:2024-11-29 GMT+08:00

Flink多流Join配置表级别TTL

在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。

可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒:

  • Hint方式格式:
    /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
  • 在SQL语句中配置示例:
    • 示例1:
      CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
        'connector' = 'kafka',
        'topic' = 'user_info_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      CREATE table print(
        `user_id` VARCHAR,
        `user_name` VARCHAR,
        `score` INT
      ) WITH ('connector' = 'print');
      CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
        'connector' = 'kafka',
        'topic' = 'user_score_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      INSERT INTO
        print
      SELECT
        t.user_id,
        t.user_name,
        d.score
      FROM
        user_info as t
        JOIN 
        --  为左表和右表设置不同的TTL时间
        /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
        user_score as d ON t.user_id = d.user_id;
    • 示例2
      INSERT INTO
        print
      SELECT
        t1.user_id,
        t1.user_name,
        t3.score
      FROM
        t1
        JOIN
        --  为左表和右表设置不同的TTL时间
        /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */
        (
          select
            UPPER(t2.user_id) as user_id,
            t2.score
          from
            t2
        ) as t3 ON t1.user_id = t3.user_id;