Enhancing the Joins of Large and Small Tables in Flink Jobs
This topic is available for MRS 3.3.0 or later only.
Joining Big and Small Tables
There are big tables and small tables when you join two Flink streams. Small table data is broadcasted to every join task, and large table data is rebalanced (distribute the data in a round robin fashion) to join tasks. This way, Flink SQL usability and job stability are improved.
You can use Flink SQL Hints to specify the left or right table in a join as the broadcasted table and the other table as the rebalanced table. In the following SQL statement examples, table A and table C are small tables:
- Use table A as the broadcasted table.
- Join
SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1
- Where
SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1
- Join
- Use table A and table C as broadcasted tables.
SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1
- This feature can be used with /*+ BROADCAST(smallTable1, smallTable2) */ to be compatible with the open-source joins of two streams.
- Switching between open-source joins and this feature is not supported because this feature broadcasts data to each join task.
- Using a small table as the left table of a LEFTJOIN is not supported. Using a small table as the right table of a RIGHTJOIN is not supported.
Deduplicating Data When Joining Big and Small Tables
When you join two streams, there is a possibility that the join operator receives a large amount of duplicate data sent by one stream. Downstream operators need to process a large amount of duplicate data, affecting job performance.
For example, join fields (P1, A1, and A2) in table A with fields (P1, B1, B2, and B3) in table B to generate field C. A large amount of data in table B is updated but the fields are remain unchanged. Assume that only the B1 and B2 fields are used in the join and only the B3 field is updated. When you update table B, B1 and B2 fields should be ignored with the deduplication function.
select A.A1,B.B1,B.B2 from A join B on A.P1=B.P1
To deduplicate table B updates, you can use Hints to set deduplication for the left table (duplicate.left) or right table (duplicate.right).
- Format
- Set deduplication for the left table.
/*+ OPTIONS('duplicate.left'='true')*/
- Set deduplication for the right table.
/*+ OPTIONS('duplicate.right'='true')*/
- Set deduplication for both tables.
/*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/
- Set deduplication for the left table.
- The following is an example in a SQL statement:
For example, set deduplication for both the left table user_info and the right table user_score.
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- Set deduplication for both left and right tables. user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot