Updated on 2024-10-08 GMT+08:00

Using UDFs in FlinkServer Jobs

This section applies to MRS 3.1.2 or later.

You can customize functions to extend SQL statements to meet personalized requirements. These functions are called user-defined functions (UDFs). You can upload and manage UDF JAR files on the Flink web UI and call UDFs when running jobs.

Flink supports the following three types of UDFs, as described in Table 1.

Table 1 Function classification

Type

Description

User-defined Scalar function (UDF)

Supports one or more input parameters and returns a single result value. For details, see UDF Java and SQL Examples.

User-defined aggregation function (UDAF)

Aggregates multiple records into one value. For details, see UDAF Java and SQL Examples.

User-defined table-valued function (UDTF)

Supports one or more input parameters and returns multiple rows or columns. For details, see UDTF Java and SQL Examples.

Prerequisites

You have prepared a UDF JAR file whose size does not exceed 200 MB.

Uploading a UDF

  1. Access the Flink web UI. For details, see Accessing the FlinkServer Web UI.
  2. Click UDF Management. The UDF Management page is displayed.
  3. Click Add UDF. Select and upload the prepared UDF JAR file for Local .jar File.
  4. Enter the UDF name and description and click OK.

    • A maximum of 10 UDF names can be added. Name can be customized. Type must correspond to the UDF fully qualified class name in the uploaded UDF JAR file.
    • After the UDF JAR file is uploaded, the server retains the file for 5 minutes by default. If you click OK within 5 minutes, the UDF creation is complete. If you click OK after 5 minutes, the UDF creation fails and an error message is displayed, indicating that the local UDF file path is incorrect.

  5. In the UDF list, you can view information about all UDFs in the current application. You can edit or delete UDF information in the Operation column. (Only unused UDF items can be deleted.)
  6. (Optional) If you need to run or develop a job immediately, configure the job on the Job Management page. For details, see Creating a FlinkServer Job.

UDF Java and SQL Examples

  • UDF Java example
    package com.xxx.udf;
    import org.apache.flink.table.functions.ScalarFunction;
    public class UdfClass_UDF extends ScalarFunction {
        public int eval(String s) {
            return s.length();
        }
    }
  • UDF SQL example
    CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udf(a)
    FROM
      udfSource;

UDAF Java and SQL Examples

  • UDAF Java example
    package com.xxx.udf;
    import org.apache.flink.table.functions.AggregateFunction;
    public class UdfClass_UDAF {
        public static class AverageAccumulator  {
            public int sum;
        }
        public static class Average extends AggregateFunction<Integer, AverageAccumulator> {
            public void accumulate(AverageAccumulator acc, Integer value) {
                acc.sum += value;
            }
            @Override
            public Integer getValue(AverageAccumulator acc) {
                return acc.sum;
            }
            @Override
            public AverageAccumulator createAccumulator() {
                return new AverageAccumulator();
            }
        }
    }
  • UDAF SQL example
    CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average';
    CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3');
    CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udaf(a)
    FROM
      udfSource group by a;

UDTF Java and SQL Examples

  • UDTF Java example
    package com.xxx.udf;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> {
        public void eval(String str) {
            Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length());
            collect(tuple2);
        }
    }
  • UDTF SQL example
    CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      str,
      strLength
    FROM
      udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);