Updated on 2024-10-09 GMT+08:00

Using UDFs in FlinkServer Jobs

This section applies to MRS 3.1.2 or later clusters.

You can customize functions to extend SQL statements to meet personalized requirements. These functions are called user-defined functions (UDFs). You can upload and manage UDF JAR files on the Flink web UI and call UDFs when running jobs.

Flink supports the following three types of UDFs, as described in Table 1.

Table 1 Function classification

Type

Description

User-defined Scalar function (UDF)

Supports one or more input parameters and returns a single result value. For details, see UDF Java and SQL Examples.

User-defined aggregation function (UDAF)

Aggregates multiple records into one value. For details, see UDAF Java and SQL Examples.

User-defined table-valued function (UDTF)

Supports one or more input parameters and returns multiple rows or columns. For details, see UDTF Java and SQL Examples.

Uploading UDFs to FlinkServer

  1. Prepare a UDF JAR file that cannot exceed 200 MB.
  2. Access the Flink web UI. For details, see Accessing the FlinkServer Web UI.
  3. Click UDF Management. The UDF Management page is displayed.
  4. Click Add UDF. Select and upload the prepared UDF JAR file for Local .jar File.
  5. Enter the UDF name and description and click OK.

    • A maximum of 10 UDF names can be added. Name can be customized. Class Name must correspond to the UDF in the uploaded UDF JAR file.
    • After the UDF JAR file is uploaded, the server retains the file for 5 minutes by default. If you click OK within 5 minutes, the UDF creation is complete. If you click OK after 5 minutes, the UDF creation fails and an error message is displayed, indicating that the local UDF file path is incorrect.

  6. In the UDF list, you can view information about all UDFs in the current application. You can edit or delete UDF information in the Operation column. (Only unused UDF items can be deleted.)
  7. (Optional) If you need to run or develop a job immediately, configure the job on the Job Management page. For details, see Creating a FlinkServer Job.

UDF Java and SQL Examples

  • UDF Java example
    package com.xxx.udf;
    import org.apache.flink.table.functions.ScalarFunction;
    public class UdfClass_UDF extends ScalarFunction {
        public int eval(String s) {
            return s.length();
        }
    }
  • UDF SQL example
    CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udf(a)
    FROM
      udfSource;

UDAF Java and SQL Examples

  • UDAF Java example
    package com.xxx.udf;
    import org.apache.flink.table.functions.AggregateFunction;
    public class UdfClass_UDAF {
        public static class AverageAccumulator  {
            public int sum;
        }
        public static class Average extends AggregateFunction<Integer, AverageAccumulator> {
            public void accumulate(AverageAccumulator acc, Integer value) {
                acc.sum += value;
            }
            @Override
            public Integer getValue(AverageAccumulator acc) {
                return acc.sum;
            }
            @Override
            public AverageAccumulator createAccumulator() {
                return new AverageAccumulator();
            }
        }
    }
  • UDAF SQL example
    CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average';
    CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3');
    CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udaf(a)
    FROM
      udfSource group by a;

UDTF Java and SQL Examples

  • UDTF Java example
    package com.xxx.udf;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> {
        public void eval(String str) {
            Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length());
            collect(tuple2);
        }
    }
  • UDTF SQL example
    CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      str,
      strLength
    FROM
      udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);

Introduction to Flink UDF Reuse

This topic is available for MRS 3.3.0 or later only.

The UDF reuse function is added to Flink SQL. When a UDF is executed for multiple times, only the first result is copied for the Nth (N > 1) execution. This ensures data consistency between multiple UDF executions and ensures that the UDF is executed only once, improving operator performance.

When configuring a Flink job, you can set table.optimizer.function-reuse-enabled to true on the Flink job development page of the Flink server web UI to enable the UDF reuse function. For details, see Creating a FlinkServer Job.

The following is a reuse example:

  • UDF
    class ItemExist extends ScalarFunction {
      val items: mutable.Set[String] = mutable.Set[String]()
    
      def eval(item: String): Boolean = {
        val exist = items.contains(item);
        if (!exist) {
          items.add(item)
        }
        exist
      }
    }
  • SQL statement

    SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;

  • Result
    • Return value when the UDF reuse function is disabled:
      a,true,c

      Because IfExist is executed once in the WHERE condition and the result is false, the data has been stored in the cache. When IfExist is executed again in SELECT, true is returned.

    • Return value when the UDF reuse function is enabled:
      a,false,c