更新时间:2024-08-16 GMT+08:00

配置FlinkServer作业中使用UDF

本章节适用于MRS 3.1.2及之后的版本。

用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。用户可以在Flink WebUI界面中上传并管理UDF jar包,然后在运行作业时调用相关UDF函数。

Flink支持以下3类自定义函数,如表1

表1 函数分类

分类

描述

UDF(User Defined Scalar Function)

自定义函数,支持一个或多个输入参数,返回一个结果值。详情请参考UDF java代码及SQL样例

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成一个值。详情请参考UDAF java代码及SQL样例

UDTF(User Defined Table-valued Function)

自定义表值函数,支持一个或多个输入参数,可返回多行多列。详情请参考UDTF java代码及SQL样例

上传UDF至FlinkServer

  1. 准备UDF jar文件,大小不能超过200MB。
  2. 访问Flink WebUI,请参考访问FlinkServer WebUI界面
  3. 单击“UDF管理”进入UDF管理页面。
  4. 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。
  5. 填写UDF名称以及描述信息后,单击“确定”。

    • “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。
    • 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。

  6. 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。
  7. (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考如何创建FlinkServer作业

UDF java代码及SQL样例

  • UDF java使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.ScalarFunction;
    public class UdfClass_UDF extends ScalarFunction {
        public int eval(String s) {
            return s.length();
        }
    }
  • UDF SQL使用样例
    CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udf(a)
    FROM
      udfSource;

UDAF java代码及SQL样例

  • UDAF java使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.AggregateFunction;
    public class UdfClass_UDAF {
        public static class AverageAccumulator  {
            public int sum;
        }
        public static class Average extends AggregateFunction<Integer, AverageAccumulator> {
            public void accumulate(AverageAccumulator acc, Integer value) {
                acc.sum += value;
            }
            @Override
            public Integer getValue(AverageAccumulator acc) {
                return acc.sum;
            }
            @Override
            public AverageAccumulator createAccumulator() {
                return new AverageAccumulator();
            }
        }
    }
  • UDAF SQL使用样例
    CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average';
    CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3');
    CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udaf(a)
    FROM
      udfSource group by a;

UDTF java代码及SQL样例

  • UDTF java使用样例
    package com.xxx.udf;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> {
        public void eval(String str) {
            Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length());
            collect(tuple2);
        }
    }
  • UDTF SQL使用样例
    CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      str,
      strLength
    FROM
      udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);

Flink UDF重用介绍

适用于MRS 3.3.0及以后版本。

FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高算子性能。

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,可参考如何创建FlinkServer作业

示例如下:

  • UDF:
    class ItemExist extends ScalarFunction {
      val items: mutable.Set[String] = mutable.Set[String]()
    
      def eval(item: String): Boolean = {
        val exist = items.contains(item);
        if (!exist) {
          items.add(item)
        }
        exist
      }
    }
  • SQL语句:

    SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;

  • 执行结果:
    • 未开启UDF重用时的返回值:
      a,true,c

      因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。

    • 开启UDF重用时的返回值:
      a,false,c