配置FlinkServer作业中使用UDF
用户可根据业务个性化计算逻辑,自行开发自定义函数,用来扩展 Flink原生语法能力,弥补内置函数无法满足复杂业务加工、字段处理、聚合分析等定制化场景的不足,这类面向SQL扩展的自定义函数统称为UDF。
Flink支持的UDF类型包含标量函数UDF、表生成函数UDTF、聚合函数 UDAF,可实现一对一、一对多、多对一等各类数据加工逻辑。
用户可在Flink WebUI页面中统一完成UDF Jar包的上传、查看与管理,无需修改集群底层配置;上传生效后,即可在 Flink SQL作业中通过注册函数的方式引用已上传的UDF,在作业运行过程中直接调用自定义函数完成业务逻辑计算,灵活实现 SQL 能力的按需扩展。
| 分类 | 描述 |
|---|---|
| UDF(User Defined Scalar Function) | 自定义函数,支持一个或多个输入参数,返回一个结果值。详情请参考UDF JAVA代码及SQL样例。 |
| UDAF(User Defined Aggregation Function) | 自定义聚合函数,将多条记录聚合成一个值。详情请参考UDAF JAVA代码及SQL样例。 |
| UDTF(User Defined Table-valued Function) | 自定义表值函数,支持一个或多个输入参数,可返回多行多列。详情请参考UDTF JAVA代码及SQL样例。 |
约束与限制
- MRS 3.1.2及之后版本的MRS集群支持在FlinkServer作业中配置使用UDF。
- MRS 3.3.0及以后版本的MRS集群,FlinkSQL的UDF支持重用功能。
上传UDF至FlinkServer
- 准备UDF jar文件,大小不能超过200 MB。
- 访问Flink WebUI,请参考访问FlinkServer WebUI界面。
- 单击“UDF管理”进入UDF管理页面。
- 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF文件。
- 填写UDF名称以及描述信息后,单击“确定”。
- “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。
- 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。
- 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。
- (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业。
UDF JAVA代码及SQL样例
- UDF JAVA使用样例
package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; /** * Flink 标量 UDF:输入字符串,返回字符串长度 */ public class UdfClass_UDF extends ScalarFunction { /** * @param s 输入参数:字符串 * @return 返回值:字符串长度 */ public int eval(String s) { // 判空处理,防止空指针 if (s == null) { return 0; } return s.length(); } }
- UDF SQL使用样例
-- 1. 注册临时 UDF CREATE TEMPORARY FUNCTION udf AS 'com.xxx.udf.UdfClass_UDF'; -- 2. 数据源:随机生成 VARCHAR 字符串(datagen 连接器) CREATE TABLE udfSource ( a VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', -- 每秒生成1条数据 'fields.a.length' = '10' -- 随机字符串长度=10(可选) ); -- 3. 输出表:打印到控制台 CREATE TABLE udfSink ( a VARCHAR, b INT ) WITH ( 'connector' = 'print' ); -- 4. 写入数据:调用 UDF 计算字符串长度 INSERT INTO udfSink SELECT a, udf(a) -- 调用自定义 UDF FROM udfSource;
UDAF JAVA代码及SQL样例
- UDAF JAVA使用样例
package com.xxx.udf; import org.apache.flink.table.functions.AggregateFunction; public class UdfClass_UDAF { public static class AverageAccumulator { // 累加和:用来存放所有数据的总和 public int sum; } public static class Average extends AggregateFunction<Integer, AverageAccumulator> { /** * 核心方法:每来一条数据,就会调用一次 * @param acc 累加器(存放中间结果) * @param value 新到的数据 * 功能:把新数据累加到 sum 中 */ public void accumulate(AverageAccumulator acc, Integer value) { acc.sum += value; } /** * 最终获取聚合结果的方法 * 分组计算完成后,Flink 调用此方法获取最终结果 * @return 返回累加的总和 */ @Override public Integer getValue(AverageAccumulator acc) { return acc.sum; } /** * 创建一个新的累加器 * 每组 key 分组时,都会创建一个新的累加器 */ @Override public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } } }
- UDAF SQL使用样例
-- 1. 注册临时 UDF CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average'; -- 2. 创建数据源表:随机生成数字的数据源 CREATE TABLE udfSource ( a int ) WITH ( 'connector' = 'datagen', -- 使用Flink内置随机数据生成器 'rows-per-second'='1', -- 每秒生成1条数据 'fields.a.min'='1', 'fields.a.max'='3' ); -- 3. 创建输出表:结果打印到控制台 CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print'); -- 4. 执行计算:从数据源读取数据 > 分组聚合 > 写入输出表 INSERT INTO udfSink SELECT a, udaf(a) FROM udfSource group by a;
UDTF JAVA代码及SQL样例
- UDTF JAVA使用样例
package com.xxx.udf; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; /** * 自定义 UDTF 表生成函数 * 功能:输入一个字符串,输出两行/两列数据(原字符串 + 字符串长度) * 泛型 <Tuple2<String, Integer>>:表示输出的行结构(String列, Int列) */ public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> { public void eval(String str) { // 构造输出数据:Tuple2.of(字段1, 字段2) // 字段1 = 原字符串 str // 字段2 = 字符串长度 str.length() Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length()); // 调用 collect() 方法向下游发送数据 // UDTF 可以调用多次 collect() 输出多行 collect(tuple2); } }
- UDTF SQL使用样例
-- 1. 注册临时 UDTF 函数(表生成函数) CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF'; -- 2. 创建数据源表 CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); -- 3. 创建输出表 CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print'); -- 4. 主逻辑:使用 UDTF 做表生成(一行输入,输出一行两列) INSERT INTO udfSink SELECT str, strLength FROM udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);
Flink UDF重用介绍
适用于MRS 3.3.0及以后版本。
FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高了算子性能。
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,参考创建FlinkServer作业。
- UDF重用示例:
class ItemExist extends ScalarFunction { val items: mutable.Set[String] = mutable.Set[String]() def eval(item: String): Boolean = { val exist = items.contains(item); if (!exist) { items.add(item) } exist } } - SQL语句示例:
SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;
- 执行结果:
- 未开启UDF重用时的返回值:
a,true,c
因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。
- 开启UDF重用时的返回值:
a,false,c
- 未开启UDF重用时的返回值: