Updated on 2024-12-13 GMT+08:00

Exiting FlinkSQL OVER Window Upon Expiration

This topic is available for MRS 3.5.0 or later only.

The function of exiting the Flink SQL OVER window upon data expiration is added. When the existing data expires and no new data arrives, OVER aggregation results are updated and the latest calculation results are sent to the downstream operator. You can use this function by configuring the parameters as follows:

Table 1 Parameters for enabling the function of exiting the Flink SQL OVER window upon data expiration

Parameter

Default Value

Description

over.window.interval

-1

The interval between two pieces of adjacent data, in milliseconds. If the interval is exceeded, the OVER window registers the expiration trigger. Value options are as follows:

  • -1: This function is disabled.
  • 0: An expiration trigger is registered for each piece of data, but the pressure on the job is increased. You can increase the value of over.window.interval based on the job requirements.
  • Positive number: The interval between two pieces of adjacent data. If the interval is exceeded, the OVER window registers the expiration trigger.

over.window.interval.last.rowdata

false

Whether to send only the last piece of data to the downstream operator. Value options are as follows:

  • false (default): All data that has not expired is sent.
  • true: Only the last piece of data is sent.

How to Use

When configuring a Flink job, you can add the custom parameter over.window.interval on the job development page of the FlinkServer web UI and set its value greater than or equal to 0 to enable the window to support the data expiration function. For details about how to create a job, see Creating a FlinkServer Job. This setting takes effect for all OVER windows in a job. You are advised to use this function for jobs with a single OVER window.

  • Example SQL statement
    CREATE TABLE OverSource (
      `rowtime` TIMESTAMP_LTZ(3),
      `groupId` INT,
      `value` INT,
      `name` STRING,
      `additional_field` STRING,
      `proctime` as PROCTIME(),
      WATERMARK FOR rowtime AS rowtime
    ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_source',
     'properties.bootstrap.servers' = 'Service IP address of the Kafka broker instance:Kafka port',  
     'properties.group.id' = 'testGroup', 
     'scan.startup.mode' = 'latest-offset', 
     'format' = 'csv',  
     'properties.sasl.kerberos.service.name' = 'kafka',  --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode.
     'properties.security.protocol' = 'SASL_PLAINTEXT',  --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode.
     'properties.kerberos.domain.name' = 'hadoop.System domain name'  --Delete this parameter if the cluster where FlinkServer is deployed is in normal mode.
    );
    
    CREATE TABLE LD_SINK(
     `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT
    ) WITH ( 
     'connector' = 'print'
    );
    
    SELECT
      `name`,
      `groupId`,
      COUNT(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as count_zw,
      SUM(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as sum_zw
    FROM
      OverSource
  • SQL statement output

    For example, with proctime is 10:00:00, groupid is 1, and value is 1 to 5, five consecutive data records are imported to Flink.

    ["zw", 1, 1, 1]
    ["zw", 1, 2, 3]
    ["zw", 1, 3, 6]
    ["zw", 1, 4, 10]
    ["zw", 1, 5, 15]

    After 10:00:10, no data is imported and the window starts to exit.

    ["zw", 1, 4, 14]
    ["zw", 1, 3, 12]
    ["zw", 1, 2, 9]
    ["zw", 1, 1, 5]
    ["zw", 1, 0, 0]