Exiting FlinkSQL OVER Window Upon Expiration
This topic is available for MRS 3.5.0 or later only.
The function of exiting the Flink SQL OVER window upon data expiration is added. When the existing data expires and no new data arrives, OVER aggregation results are updated and the latest calculation results are sent to the downstream operator. You can use this function by configuring the parameters as follows:
Parameter |
Default Value |
Description |
---|---|---|
over.window.interval |
-1 |
The interval between two pieces of adjacent data, in milliseconds. If the interval is exceeded, the OVER window registers the expiration trigger. Value options are as follows:
|
over.window.interval.last.rowdata |
false |
Whether to send only the last piece of data to the downstream operator. Value options are as follows:
|
How to Use
When configuring a Flink job, you can add the custom parameter over.window.interval on the job development page of the FlinkServer web UI and set its value greater than or equal to 0 to enable the window to support the data expiration function. For details about how to create a job, see Creating a FlinkServer Job. This setting takes effect for all OVER windows in a job. You are advised to use this function for jobs with a single OVER window.
- Example SQL statement
CREATE TABLE OverSource ( `rowtime` TIMESTAMP_LTZ(3), `groupId` INT, `value` INT, `name` STRING, `additional_field` STRING, `proctime` as PROCTIME(), WATERMARK FOR rowtime AS rowtime ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode. 'properties.security.protocol' = 'SASL_PLAINTEXT', --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name' --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode. ); CREATE TABLE LD_SINK( `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT ) WITH ( 'connector' = 'print' ); SELECT `name`, `groupId`, COUNT(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as count_zw, SUM(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as sum_zw FROM OverSource
- SQL statement output
For example, with proctime is 10:00:00, groupid is 1, and value is 1 to 5, five consecutive data records are imported to Flink.
["zw", 1, 1, 1] ["zw", 1, 2, 3] ["zw", 1, 3, 6] ["zw", 1, 4, 10] ["zw", 1, 5, 15]
After 10:00:10, no data is imported and the window starts to exit.
["zw", 1, 4, 14] ["zw", 1, 3, 12] ["zw", 1, 2, 9] ["zw", 1, 1, 5] ["zw", 1, 0, 0]
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot