Enhancing the Joins of Large and Small Tables in Flink Jobs
This section applies to MRS 3.3.0 or later.
Joining Big and Small Tables
There are big tables and small tables when you join two Flink streams. Small table data is broadcasted to every join task, and large table data is rebalanced (distribute the data in a round robin fashion) to join tasks. This way, Flink SQL usability and job stability are improved.

You can use Flink SQL Hints to specify the left or right table in a join as a broadcasted table and the other table as a rebalanced table. The following SQL statement examples use table A and table C as small tables:
- Use table A as the broadcasted table.
- Join
SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1
- Where
SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1
- Join
- Use table A and table C as broadcasted tables.
SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1

- This feature can be used with /*+ BROADCAST(smallTable1, smallTable2) */ to be compatible with the open-source joins of two streams.
- Switching between open-source joins and this feature is not supported because this feature broadcasts data to each join task.
- Using a small table as the left table of a LEFTJOIN is not supported. Using a small table as the right table of a RIGHTJOIN is not supported.
Deduplicating Data When Joining Big and Small Tables
When you join two streams, there is a possibility that the join operator receives a large amount of duplicate data sent by one stream. Downstream operators need to process a large amount of duplicate data, affecting job performance.
For example, join fields (P1, A1, and A2) in table A with fields (P1, B1, B2, and B3) in table B to generate field C. A large amount of data in table B is updated but the fields are remain unchanged. Assume that only the B1 and B2 fields are used in the join and only the B3 field is updated. When you update table B, B1 and B2 fields should be ignored with the deduplication function.
select A.A1,B.B1,B.B2 from A join B on A.P1=B.P1
To deduplicate table B updates, you can use Hints to set deduplication for the left table (duplicate.left) or right table (duplicate.right).
- Format
- Set deduplication for the left table.
/*+ OPTIONS('duplicate.left'='true')*/
- Set deduplication for the right table.
/*+ OPTIONS('duplicate.right'='true')*/
- Set deduplication for both tables.
/*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/
- Set deduplication for the left table.
- The following is an example in a SQL statement:
For example, set deduplication for both the left table user_info and the right table user_score.
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- Set deduplication for both left and right tables. user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;
The IP address and port number of the Kafka Broker instance are as follows:- To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
- If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
- If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot