Updated on 2025-08-22 GMT+08:00

Exiting FlinkSQL OVER Window Upon Expiration

This section applies only to MRS 3.5.0 or later.

The function of exiting the Flink SQL OVER window upon data expiration is added. When the existing data expires and no new data arrives, OVER aggregation results are updated and the latest calculation results are sent to the downstream operator. You can use this function by configuring the following parameters.

Table 1 Parameters for enabling the function of exiting the Flink SQL OVER window upon data expiration

Parameter

Default Value

Description

over.window.interval

-1

The interval between two pieces of adjacent data, in milliseconds. If the interval is exceeded, the OVER window registers the expiration trigger. Value options are as follows:

  • -1: This function is disabled.
  • 0: An expiration trigger is registered for each piece of data, but the pressure on the job is increased. You can increase the value of over.window.interval based on the job requirements.
  • Positive number: The interval between two pieces of adjacent data. If the interval is exceeded, the OVER window registers the expiration trigger.

over.window.interval.last.rowdata

false

Whether to send only the last piece of data to the downstream operator. Value options are as follows:

  • false (default): All data that has not expired is sent.
  • true: Only the last piece of data is sent.

How to Use

When configuring a Flink job, you can add the custom parameter over.window.interval on the job development page of the FlinkServer web UI and set its value greater than or equal to 0 to enable the window to support the data expiration function. For details about how to create a job, see Creating a Job. This setting takes effect for all OVER windows in a job. You are advised to use this function for jobs with a single OVER window.

  • Example SQL statements:
    CREATE TABLE OverSource (
      `rowtime` TIMESTAMP_LTZ(3),
      `groupId` INT,
      `value` INT,
      `name` STRING,
      `additional_field` STRING,
      `proctime` as PROCTIME(),
      WATERMARK FOR rowtime AS rowtime
    ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_source',
     'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port',  
     'properties.group.id' = 'testGroup', 
     'scan.startup.mode' = 'latest-offset', 
     'format' = 'csv',  
     'properties.sasl.kerberos.service.name' = 'kafka',   --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode.
     'properties.security.protocol' = 'SASL_PLAINTEXT',  --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode.
     'properties.kerberos.domain.name' = 'hadoop.System domain name'  --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode.
    );
    
    CREATE TABLE LD_SINK(
     `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT
    ) WITH ( 
     'connector' = 'print'
    );
    
    SELECT
      `name`,
      `groupId`,
      COUNT(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as count_zw,
      SUM(`value`) OVER (
        PARTITION BY groupId
        ORDER BY
          proctime RANGE BETWEEN INTERVAL '10' second PRECEDING
          AND CURRENT ROW
      ) as sum_zw
    FROM
      OverSource
    • The IP address and port number of the Kafka Broker instance are as follows:
      • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
      • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
      • If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

        Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

    • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
  • SQL statement output

    For example, with proctime set to 10:00:00, groupid to 1, and value to a range from 1 to 5, five consecutive data records are imported to Flink.

    ["zw", 1, 1, 1]
    ["zw", 1, 2, 3]
    ["zw", 1, 3, 6]
    ["zw", 1, 4, 10]
    ["zw", 1, 5, 15]

    After 10:00:10, no data is imported and the window starts to exit.

    ["zw", 1, 4, 14]
    ["zw", 1, 3, 12]
    ["zw", 1, 2, 9]
    ["zw", 1, 1, 5]
    ["zw", 1, 0, 0]