更新时间:2026-05-12 GMT+08:00
分享

配置FlinkServer作业中使用UDF

用户可根据业务个性化计算逻辑,自行开发自定义函数,用来扩展 Flink原生语法能力,弥补内置函数无法满足复杂业务加工、字段处理、聚合分析等定制化场景的不足,这类面向SQL扩展的自定义函数统称为UDF。

Flink支持的UDF类型包含标量函数UDF、表生成函数UDTF、聚合函数 UDAF,可实现一对一、一对多、多对一等各类数据加工逻辑。

用户可在Flink WebUI页面中统一完成UDF Jar包的上传、查看与管理,无需修改集群底层配置;上传生效后,即可在 Flink SQL作业中通过注册函数的方式引用已上传的UDF,在作业运行过程中直接调用自定义函数完成业务逻辑计算,灵活实现 SQL 能力的按需扩展。

表1 UDF函数分类

分类

描述

UDF(User Defined Scalar Function)

自定义函数,支持一个或多个输入参数,返回一个结果值。详情请参考UDF JAVA代码及SQL样例

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成一个值。详情请参考UDAF JAVA代码及SQL样例

UDTF(User Defined Table-valued Function)

自定义表值函数,支持一个或多个输入参数,可返回多行多列。详情请参考UDTF JAVA代码及SQL样例

约束与限制

  • MRS 3.1.2及之后版本的MRS集群支持在FlinkServer作业中配置使用UDF。
  • MRS 3.3.0及以后版本的MRS集群,FlinkSQL的UDF支持重用功能。

上传UDF至FlinkServer

  1. 准备UDF jar文件,大小不能超过200 MB
  2. 访问Flink WebUI,请参考访问FlinkServer WebUI界面
  3. 单击“UDF管理”进入UDF管理页面。
  4. 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF文件。
  5. 填写UDF名称以及描述信息后,单击“确定”。

    • “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。
    • 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。

  6. 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。
  7. (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业

UDF JAVA代码及SQL样例

  • UDF JAVA使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.ScalarFunction;
    /**
     * Flink 标量 UDF:输入字符串,返回字符串长度
     */
    public class UdfClass_UDF extends ScalarFunction {
        /**
         * @param s 输入参数:字符串
         * @return 返回值:字符串长度
         */
        public int eval(String s) {
            // 判空处理,防止空指针
            if (s == null) {
                return 0;
            }
            return s.length();
        }
    }
  • UDF SQL使用样例
    -- 1. 注册临时 UDF
    CREATE TEMPORARY FUNCTION udf AS 'com.xxx.udf.UdfClass_UDF';
    -- 2. 数据源:随机生成 VARCHAR 字符串(datagen 连接器)
    CREATE TABLE udfSource (
        a VARCHAR
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1',               -- 每秒生成1条数据
        'fields.a.length' = '10'               -- 随机字符串长度=10(可选)
    );
    -- 3. 输出表:打印到控制台
    CREATE TABLE udfSink (
        a VARCHAR,
        b INT
    ) WITH (
        'connector' = 'print'
    );
    -- 4. 写入数据:调用 UDF 计算字符串长度
    INSERT INTO udfSink
    SELECT
        a,
        udf(a)  -- 调用自定义 UDF
    FROM udfSource;

UDAF JAVA代码及SQL样例

  • UDAF JAVA使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.AggregateFunction;
    public class UdfClass_UDAF {
        public static class AverageAccumulator  {
            // 累加和:用来存放所有数据的总和
            public int sum;
        }
        public static class Average extends AggregateFunction<Integer, AverageAccumulator> {
    
            /**
             * 核心方法:每来一条数据,就会调用一次
             * @param acc 累加器(存放中间结果)
             * @param value 新到的数据
             * 功能:把新数据累加到 sum 中
             */
    
            public void accumulate(AverageAccumulator acc, Integer value) {
                acc.sum += value;
            }
            /**
             * 最终获取聚合结果的方法
             * 分组计算完成后,Flink 调用此方法获取最终结果
             * @return 返回累加的总和
             */
            @Override
            public Integer getValue(AverageAccumulator acc) {
                return acc.sum;
            }
            /**
             * 创建一个新的累加器
             * 每组 key 分组时,都会创建一个新的累加器
             */
            @Override
            public AverageAccumulator createAccumulator() {
                return new AverageAccumulator();
            }
        }
    }
  • UDAF SQL使用样例
    -- 1. 注册临时 UDF
    CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average';
    -- 2. 创建数据源表:随机生成数字的数据源
    CREATE TABLE udfSource (
        a int
    ) WITH (
        'connector' = 'datagen',   -- 使用Flink内置随机数据生成器
        'rows-per-second'='1',  -- 每秒生成1条数据
        'fields.a.min'='1',
        'fields.a.max'='3'
    );
    -- 3. 创建输出表:结果打印到控制台
    CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print');
    -- 4. 执行计算:从数据源读取数据 > 分组聚合 > 写入输出表
    INSERT INTO
      udfSink
    SELECT
      a,
      udaf(a)
    FROM
      udfSource group by a;

UDTF JAVA代码及SQL样例

  • UDTF JAVA使用样例
    package com.xxx.udf;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    /**
     * 自定义 UDTF 表生成函数
     * 功能:输入一个字符串,输出两行/两列数据(原字符串 + 字符串长度)
     * 泛型 <Tuple2<String, Integer>>:表示输出的行结构(String列, Int列)
     */
    public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> {
        public void eval(String str) {
            // 构造输出数据:Tuple2.of(字段1, 字段2)
            // 字段1 = 原字符串 str
            // 字段2 = 字符串长度 str.length()
            Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length());
            // 调用 collect() 方法向下游发送数据
            // UDTF 可以调用多次 collect() 输出多行
            collect(tuple2);
        }
    }
  • UDTF SQL使用样例
    -- 1. 注册临时 UDTF 函数(表生成函数)
    CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF';
    -- 2. 创建数据源表
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    -- 3. 创建输出表
    CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print');
    -- 4. 主逻辑:使用 UDTF 做表生成(一行输入,输出一行两列)
    INSERT INTO
      udfSink
    SELECT
      str,
      strLength
    FROM
      udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);

Flink UDF重用介绍

适用于MRS 3.3.0及以后版本。

FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高了算子性能。

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,参考创建FlinkServer作业

  • UDF重用示例:
    class ItemExist extends ScalarFunction {
      val items: mutable.Set[String] = mutable.Set[String]()
    
      def eval(item: String): Boolean = {
        val exist = items.contains(item);
        if (!exist) {
          items.add(item)
        }
        exist
      }
    }
  • SQL语句示例:
    SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;
  • 执行结果:
    • 未开启UDF重用时的返回值:
      a,true,c

      因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。

    • 开启UDF重用时的返回值:
      a,false,c

相关文档