更新时间:2024-11-29 GMT+08:00
Flink UDF重用
操作场景
FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高算子性能。
使用方法
配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,可参考创建作业。
示例
- UDF:
class ItemExist extends ScalarFunction { val items: mutable.Set[String] = mutable.Set[String]() def eval(item: String): Boolean = { val exist = items.contains(item); if (!exist) { items.add(item) } exist } }
- SQL语句:
SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;
- 执行结果:
- 未开启UDF重用时的返回值:
a,true,c
因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。
- 开启UDF重用时的返回值:
a,false,c
- 未开启UDF重用时的返回值:
父主题: 配置开发Flink可视化作业