Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Enhancements to Flink SQL/ Configuring Table-Level Time To Live (TTL) for Joining Multiple Flink Streams
Updated on 2024-05-29 GMT+08:00

Configuring Table-Level Time To Live (TTL) for Joining Multiple Flink Streams

This section applies to MRS 3.3.0 or later.

When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long expiration time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for the left and right tables. WHERE clauses are not supported.

For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.

  • Use Hints in the following format:
    /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
  • The following is a configuration example with a SQL statement:
    • Example 1:
      CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
        'connector' = 'kafka',
        'topic' = 'user_info_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      CREATE table print(
        `user_id` VARCHAR,
        `user_name` VARCHAR,
        `score` INT
      ) WITH ('connector' = 'print');
      CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
        'connector' = 'kafka',
        'topic' = 'user_score_001',
        'properties.bootstrap.servers' = '192.168.64.138:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'value.format' = 'csv'
      );
      INSERT INTO
        print
      SELECT
        t.user_id,
        t.user_name,
        d.score
      FROM
        user_info as t
        JOIN 
        -- Set different TTLs for the left and right tables.
        /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
        user_score as d ON t.user_id = d.user_id;
    • Example 2
      INSERT INTO
        print
      SELECT
        t1.user_id,
        t1.user_name,
        t3.score
      FROM
        t1
        JOIN
        -- Set different TTLs for the left and right tables.
        /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */
        (
          select
            UPPER(t2.user_id) as user_id,
            t2.score
          from
            t2
        ) as t3 ON t1.user_id = t3.user_id;