更新时间:2024-10-22 GMT+08:00
分享

配置FlinkServer作业中使用UDF

本章节适用于MRS 3.1.2及之后的版本。

用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。用户可以在Flink WebUI界面中上传并管理UDF jar包,然后在运行作业时调用相关UDF函数。

Flink支持以下3类自定义函数,如表1

表1 函数分类

分类

描述

UDF(User Defined Scalar Function)

自定义函数,支持一个或多个输入参数,返回一个结果值。详情请参考UDF java代码及SQL样例

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成一个值。详情请参考UDAF java代码及SQL样例

UDTF(User Defined Table-valued Function)

自定义表值函数,支持一个或多个输入参数,可返回多行多列。详情请参考UDTF java代码及SQL样例

前提条件

准备UDF jar文件,大小不能超过200MB。

上传UDF

  1. 访问Flink WebUI,请参考访问FlinkServer WebUI界面
  2. 单击“UDF管理”进入UDF管理页面。
  3. 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。
  4. 填写UDF名称以及描述信息后,单击“确定”。

    • “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。
    • 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。

  5. 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。
  6. (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业

UDF java代码及SQL样例

  • UDF java使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.ScalarFunction;
    public class UdfClass_UDF extends ScalarFunction {
        public int eval(String s) {
            return s.length();
        }
    }
  • UDF SQL使用样例
    CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udf(a)
    FROM
      udfSource;

UDAF java代码及SQL样例

  • UDAF java使用样例
    package com.xxx.udf;
    import org.apache.flink.table.functions.AggregateFunction;
    public class UdfClass_UDAF {
        public static class AverageAccumulator  {
            public int sum;
        }
        public static class Average extends AggregateFunction<Integer, AverageAccumulator> {
            public void accumulate(AverageAccumulator acc, Integer value) {
                acc.sum += value;
            }
            @Override
            public Integer getValue(AverageAccumulator acc) {
                return acc.sum;
            }
            @Override
            public AverageAccumulator createAccumulator() {
                return new AverageAccumulator();
            }
        }
    }
  • UDAF SQL使用样例
    CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average';
    CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3');
    CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      a,
      udaf(a)
    FROM
      udfSource group by a;

UDTF java代码及SQL样例

  • UDTF java使用样例
    package com.xxx.udf;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> {
        public void eval(String str) {
            Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length());
            collect(tuple2);
        }
    }
  • UDTF SQL使用样例
    CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF';
    CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1');
    CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print');
    INSERT INTO
      udfSink
    SELECT
      str,
      strLength
    FROM
      udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);

相关文档