Updated on 2024-10-25 GMT+08:00

Table-Level TTL for Stream Joins

This topic is available for MRS 3.3.0 or later only.

When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for left and right tables. The WHERE clause is not supported.

For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.

  • Use Hints in the following format:
    /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
  • The following is a configuration example with a SQL statement:
    • Example 1:
      CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
        'connector' = 'kafka',
        'topic' = 'user_info_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      CREATE table print(
        `user_id` VARCHAR,
        `user_name` VARCHAR,
        `score` INT
      ) WITH ('connector' = 'print');
      CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
        'connector' = 'kafka',
        'topic' = 'user_score_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      INSERT INTO
        print
      SELECT
        t.user_id,
        t.user_name,
        d.score
      FROM
        user_info as t
        JOIN 
        --  Set different TTLs for left and right tables.
        /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
        user_score as d ON t.user_id = d.user_id;
    • Example 2
      INSERT INTO
        print
      SELECT
        t1.user_id,
        t1.user_name,
        t3.score
      FROM
        t1
        JOIN
        --  Set different TTLs for left and right tables.
        /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */
        (
          select
            UPPER(t2.user_id) as user_id,
            t2.score
          from
            t2
        ) as t3 ON t1.user_id = t3.user_id;