Updated on 2025-08-22 GMT+08:00

Table-Level TTL for Stream Joins

This section applies to MRS 3.3.0 or later.

When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for left and right tables. The WHERE clause is not supported.

For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.
/*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
The following is a configuration example with a SQL statement:
  • Example 1: Set different expiration time for the left and right tables.
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      --  Set different TTLs for left and right tables.
      /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
      user_score as d ON t.user_id = d.user_id;
    The IP address and port number of the Kafka Broker instance are as follows:
    • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
    • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
    • If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

      Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

  • Example 2: Set the different expiration time for the left and right tables. The right table can be a subquery.
    INSERT INTO
      print
    SELECT
      t1.user_id,
      t1.user_name,
      t3.score
    FROM
      t1
      JOIN
      --  Set different TTLs for left and right tables.
      /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */
      (
        select
          UPPER(t2.user_id) as user_id,
          t2.score
        from
          t2
      ) as t3 ON t1.user_id = t3.user_id;