配置FlinkServer作业中使用UDF
本章节适用于MRS 3.1.2及之后的版本。
用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。用户可以在Flink WebUI界面中上传并管理UDF jar包,然后在运行作业时调用相关UDF函数。
Flink支持以下3类自定义函数,如表1。
分类 |
描述 |
---|---|
UDF(User Defined Scalar Function) |
自定义函数,支持一个或多个输入参数,返回一个结果值。详情请参考UDF java代码及SQL样例。 |
UDAF(User Defined Aggregation Function) |
自定义聚合函数,将多条记录聚合成一个值。详情请参考UDAF java代码及SQL样例。 |
UDTF(User Defined Table-valued Function) |
自定义表值函数,支持一个或多个输入参数,可返回多行多列。详情请参考UDTF java代码及SQL样例。 |
上传UDF至FlinkServer
- 准备UDF jar文件,大小不能超过200MB。
- 访问Flink WebUI,请参考访问FlinkServer WebUI界面。
- 单击“UDF管理”进入UDF管理页面。
- 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。
- 填写UDF名称以及描述信息后,单击“确定”。
- “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。
- 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。
- 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。
- (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考如何创建FlinkServer作业。
UDF java代码及SQL样例
- UDF java使用样例
package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } }
- UDF SQL使用样例
CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
UDAF java代码及SQL样例
- UDAF java使用样例
package com.xxx.udf; import org.apache.flink.table.functions.AggregateFunction; public class UdfClass_UDAF { public static class AverageAccumulator { public int sum; } public static class Average extends AggregateFunction<Integer, AverageAccumulator> { public void accumulate(AverageAccumulator acc, Integer value) { acc.sum += value; } @Override public Integer getValue(AverageAccumulator acc) { return acc.sum; } @Override public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } } }
- UDAF SQL使用样例
CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average'; CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3'); CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udaf(a) FROM udfSource group by a;
UDTF java代码及SQL样例
- UDTF java使用样例
package com.xxx.udf; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> { public void eval(String str) { Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length()); collect(tuple2); } }
- UDTF SQL使用样例
CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT str, strLength FROM udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);
Flink UDF重用介绍
适用于MRS 3.3.0及以后版本。
FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高算子性能。
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,可参考如何创建FlinkServer作业。
示例如下:
- UDF:
class ItemExist extends ScalarFunction { val items: mutable.Set[String] = mutable.Set[String]() def eval(item: String): Boolean = { val exist = items.contains(item); if (!exist) { items.add(item) } exist } }
- SQL语句:
SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;
- 执行结果:
- 未开启UDF重用时的返回值:
a,true,c
因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。
- 开启UDF重用时的返回值:
a,false,c
- 未开启UDF重用时的返回值: