Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Flink Enterprise-Class Enhancements/ Enhancing the Joins of Large and Small Tables in Flink Jobs
Updated on 2024-10-09 GMT+08:00

Enhancing the Joins of Large and Small Tables in Flink Jobs

This topic is available for MRS 3.3.0 or later only.

Joining Big and Small Tables

There are big tables and small tables when you join two Flink streams. Small table data is broadcasted to every join task, and large table data is rebalanced (distribute the data in a round robin fashion) to join tasks. This way, Flink SQL usability and job stability are improved.

Figure 1 Joining big and small tables

You can use Flink SQL Hints to specify the left or right table in a join as the broadcasted table and the other table as the rebalanced table. In the following SQL statement examples, table A and table C are small tables:

  • Use table A as the broadcasted table.
    • Join
      SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1
    • Where
      SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1
  • Use table A and table C as broadcasted tables.
    SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1
  • This feature can be used with /*+ BROADCAST(smallTable1, smallTable2) */ to be compatible with the open-source joins of two streams.
  • Switching between open-source joins and this feature is not supported because this feature broadcasts data to each join task.
  • Using a small table as the left table of a LEFTJOIN is not supported. Using a small table as the right table of a RIGHTJOIN is not supported.

Deduplicating Data When Joining Big and Small Tables

When you join two streams, there is a possibility that the join operator receives a large amount of duplicate data sent by one stream. Downstream operators need to process a large amount of duplicate data, affecting job performance.

For example, join fields (P1, A1, and A2) in table A with fields (P1, B1, B2, and B3) in table B to generate field C. A large amount of data in table B is updated but the fields are remain unchanged. Assume that only the B1 and B2 fields are used in the join and only the B3 field is updated. When you update table B, B1 and B2 fields should be ignored with the deduplication function.

select  A.A1,B.B1,B.B2 from A join B on A.P1=B.P1

To deduplicate table B updates, you can use Hints to set deduplication for the left table (duplicate.left) or right table (duplicate.right).

  • Format
    • Set deduplication for the left table.
       /*+ OPTIONS('duplicate.left'='true')*/
    • Set deduplication for the right table.
       /*+ OPTIONS('duplicate.right'='true')*/
    • Set deduplication for both tables.
       /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/
  • The following is an example in a SQL statement:
    For example, set deduplication for both the left table user_info and the right table user_score.
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      -- Set deduplication for both left and right tables.
      user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;