更新时间:2024-11-29 GMT+08:00

Flink UDF重用

操作场景

FlinkSQL的UDF新增重用功能,当UDF被多次执行时,第N(N>1)次执行只复制第1次结果,可以确保UDF多次执行的数据一致性,同时确保UDF只被执行一次,提高算子性能。

使用方法

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizer.function-reuse-enabled”为“true”开启UDF重用功能,可参考创建作业

示例

  • UDF:
    class ItemExist extends ScalarFunction {
      val items: mutable.Set[String] = mutable.Set[String]()
    
      def eval(item: String): Boolean = {
        val exist = items.contains(item);
        if (!exist) {
          items.add(item)
        }
        exist
      }
    }
  • SQL语句:

    SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;

  • 执行结果:
    • 未开启UDF重用时的返回值:
      a,true,c

      因为在WHERE条件中IfExist被执行一次,并且结果为false,所以在其缓存中已存储该数据,在SELECT中再次执行时即返回true。

    • 开启UDF重用时的返回值:
      a,false,c