Table-Level TTL for Stream Joins
This topic is available for MRS 3.3.0 or later only.
When you join two Flink streams, there is a possibility that data in one table changes rapidly (short TTL) and data in the other table changes slowly (long TTL). Currently, Flink supports only table-level TTL. To ensure join accuracy, you need to set the table-level TTL to a long time. In this case, a large amount of expired data is stored in state backends, causing great workload pressure. To reduce the pressure, you can use Hints to set different expiration time for left and right tables. The WHERE clause is not supported.
For example, set the TTL of the left table (state.ttl.left) to 60 seconds and that of the right table (state.ttl.right) to 120 seconds.
- Use Hints in the following format:
/*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */
- The following is a configuration example with a SQL statement:
- Example 1:
CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- Set different TTLs for left and right tables. /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id;
- Example 2
INSERT INTO print SELECT t1.user_id, t1.user_name, t3.score FROM t1 JOIN -- Set different TTLs for left and right tables. /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */ ( select UPPER(t2.user_id) as user_id, t2.score from t2 ) as t3 ON t1.user_id = t3.user_id;
- Example 1:
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot