Updated on 2024-12-13 GMT+08:00

Join-To-Live

Flink dual-stream join needs to store data in the state backend. Currently, RocksDB is widely used as the state backend. In scenarios where the time to live (TTL) is too long, the TTL cannot be determined, or the data traffic increases, heavy traffic increases the state data and storage pressure. As a result, job stability decreases, or TTL expiration may cause inaccurate joins.

For services whose number of joins are determined, the Join-To-Live (JTL) feature can be used to reduce the pressure on state backends. This feature determines whether data expires based on the number of joins. It can be configured in either of the following ways:

  • This function is available for the inner join statement of Flink regular joins only.
  • This function cannot be used together with job-level TTLs, table-level TTLs, or small table broadcasting.
  • A primary key must be specified for a table that has a specified JTL. Otherwise, the query result may be inaccurate.
  • Method 1: Using through SQL hints

    eliminate-state.left.threshold: indicates the threshold of the number of associations on the left. If the number of associations on the left exceeds the threshold, the piece of data expires.

    eliminate-state.right.threshold: indicates the threshold of the number of associations on the right. If the number of associations on the right exceeds the threshold, the piece of data expires.

    Example 1:

    SELECT * FROM t1
       JOIN /*+ OPTIONS('eliminate-state.right.threshold'='1', 'eliminate-state.left.threshold'='2') */
       t2 ON a1 = a2

    Example 2:

    SELECT a1, a2, a3 from
      t1
      join /*+ OPTIONS('eliminate-state.left.threshold'='1', 'eliminate-state.right.threshold'='2') */
      t2
      on a1 = a2
      join /*+ OPTIONS('eliminate-state.left.threshold'='3', 'eliminate-state.right.threshold'='4') */
      t3
      on a2 = a3
  • Method 2: Configuring the two parameters in Client installation path/Flink/flink/conf/flink-conf.yaml for globally effective
    table.exec.join.eliminate-state.left.threshold
    table.exec.join.eliminate-state.right.threshold