Updated on 2024-11-29 GMT+08:00

Reusing Flink UDFs

Scenario

The UDF reuse function is added to Flink SQL. When a UDF is executed for multiple times, only the first result is copied for the Nth (N > 1) execution. This ensures data consistency between multiple UDF executions and ensures that the UDF is executed only once, improving operator performance.

How to Use

When configuring a Flink job, you can set table.optimizer.function-reuse-enabled to true on the Flink job development page of the Flink server web UI to enable the UDF reuse function. For details, see Creating a Job.

Example

  • UDF:
    class ItemExist extends ScalarFunction {
      val items: mutable.Set[String] = mutable.Set[String]()
    
      def eval(item: String): Boolean = {
        val exist = items.contains(item);
        if (!exist) {
          items.add(item)
        }
        exist
      }
    }
  • SQL statement:

    SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;

  • Execution result:
    • Return value when the UDF reuse function is disabled:
      a,true,c

      Because IfExist is executed once in the WHERE condition and the result is false, the data has been stored in the cache. When IfExist is executed again in SELECT, true is returned.

    • Return value when the UDF reuse function is enabled:
      a,false,c