Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Enhancements to Flink SQL/ Deduplicating Data When Joining Big and Small Tables
Updated on 2024-05-29 GMT+08:00

Deduplicating Data When Joining Big and Small Tables

This section applies to MRS 3.3.0 or later.

When you join two streams, there is a possibility that the join operator receives a large amount of duplicate data sent by one stream. Downstream operators need to process a large amount of duplicate data, affecting job performance.

For example, join fields (P1, A1, and A2) in table A with fields (P1, B1, B2, and B3) in table B to generate field C. A large amount of data in table B is updated but the fields are remain unchanged. Assume that only the B1 and B2 fields are used in the join and only the B3 field is updated. When you update table B, B1 and B2 fields should be ignored with the deduplication function.

select  A.A1,B.B1,B.B2 from A join B on A.P1=B.P1

To deduplicate table B updates, you can use Hints to set deduplication for the left table (duplicate.left) or right table (duplicate.right).

  • Format
    • Set deduplication for the left table.
       /*+ OPTIONS('duplicate.left'='true')*/
    • Set deduplication for the right table.
       /*+ OPTIONS('duplicate.right'='true')*/
    • Set deduplication for both the left and right tables.
       /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/
  • The following is an example with a SQL statement:
    For example, set deduplication for both the left table user_info and the right table user_score.
    CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH (
      'connector' = 'kafka',
      'topic' = 'user_info_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    CREATE table print(
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `score` INT
    ) WITH ('connector' = 'print');
    CREATE TABLE user_score (user_id VARCHAR, score INT) WITH (
      'connector' = 'kafka',
      'topic' = 'user_score_001',
      'properties.bootstrap.servers' = '192.168.64.138:21005',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv'
    );
    INSERT INTO
      print
    SELECT
      t.user_id,
      t.user_name,
      d.score
    FROM
      user_info as t
      JOIN 
      -- Set deduplication for left and right tables.
      user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;