文档首页 > > SQL语法参考> 数据操作语句> 自定义函数

自定义函数

分享
更新时间: 2019/02/26 14:38

概述

CS支持三种自定义函数:

  • UDF:自定义函数,支持一个或多个输入参数,返回一个结果值。
  • UDTF:自定义表值函数,支持一个或多个输入参数,可返回多行多列。
  • UDAF:自定义聚合函数,将多条记录聚合成一个值。
说明:

自定义函数仅能在独享集群中使用,不支持在共享集群中使用。

POM依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table_2.11</artifactId>
   <version>1.5.0</version>
   <scope>provided</scope>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.5.0</version>
        <scope>provided</scope>
</dependency>

使用方式

  1. 将写好的自定义函数打成JAR包,并上传到OBS上。
  2. 在CS管理控制台的左侧导航栏中,单击“作业管理”,在需要编辑作业对应的“操作”列中,单击“编辑”,进入作业编辑页面。
  3. “运行参数设置”页签,“自定义函数Jar包”选择存放在OBS上的JAR文件,单击“保存”

    选定JAR包以后,SQL里添加UDF声明语句,就可以像普通函数一样使用了。

    CREATE FUNCTION udf_test AS 'com.huawei.udf.UdfScalarFunction';

UDF

UDF函数需继承ScalarFunction函数,并实现eval方法。open函数及close函数可选。

编写代码示例

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class UdfScalarFunction extends ScalarFunction {
  private int factor = 12;
  public UdfScalarFunction() {
    this.factor = 12;
  }
  /**
   * 初始化操作,可选
   * @param context
   */
  @Override
  public void open(FunctionContext context) {}
  /**
   * 自定义逻辑
   * @param s
   * @return
   */
   public int eval(String s) {
     return s.hashCode() * factor;
   }
   /**
    * 可选
    */
   @Override
   public void close() {}
}

使用示例

CREATE FUNCTION udf_test AS 'com.huawei.udf.UdfScalarFunction';
INSERT INTO sink_stream select udf_test(attr) FROM source_stream;

UDTF

UDTF函数需继承TableFunction函数,并实现eval方法。open函数及close函数可选。如果需要UDTF返回多列,只需要将返回值声明成Tuple或Row即可。若使用Row,需要重载getResultType声明返回的字段类型。

编写代码示例

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdfTableFunction extends TableFunction<Row> {
  private Logger log = LoggerFactory.getLogger(TableFunction.class);
  /**
   * 初始化操作,可选   
   * @param context
   */
  @Override
  public void open(FunctionContext context) {}
  public void eval(String str, String split) {
    for (String s : str.split(split)) {
      Row row = new Row(2);
      row.setField(0, s);
      row.setField(1, s.length());
      collect(row);
    }
  }
  /**
   * 函数返回类型声明
   * @return
   */
  @Override
  public TypeInformation<Row> getResultType() {
  return Types.ROW(Types.STRING, Types.INT);
  }
  /**
   * 可选
   */
  @Override
  public void close() {}
 }

使用示例

UDTF支持CROSS JOIN和LEFT JOIN,在使用UDTF时需要带上 LATERAL 和TABLE 两个关键字。

  • CROSS JOIN:对于左表的每一行数据,假设UDTF不产生输出,则这一行不进行输出。
  • LEFT JOIN:对于左表的每一行数据,假设UDTF不产生输出,这一行仍会输出,UDTF相关字段用null填充。
CREATE FUNCTION udtf_test AS 'com.huawei.udf.TableFunction';
// CROSS JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length);
// LEFT JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;

UDAF

UDAF函数需继承AggregateFunction函数。首先需要创建一个用来存储计算结果的Accumulator,如示例里的WeightedAvgAccum。

编写代码示例

public class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}

import org.apache.flink.table.functions.AggregateFunction;
import java.util.Iterator;
/**
* 第一个类型变量为聚合函数返回的类型,第二个类型变量为Accumulator类型
* Weighted Average user-defined aggregate function.
*/
public class UdfAggFunction extends AggregateFunction<Long, WeightedAvgAccum> {
  // 初始化Accumulator
  @Override
  public WeightedAvgAccum createAccumulator() {
    return new WeightedAvgAccum();
  }
  // 返回Accumulator存储的中间计算值
  @Override
  public Long getValue(WeightedAvgAccum acc) {
    if (acc.count == 0) {
       return null;
    } else {
      return acc.sum / acc.count;
 }
}
// 根据输入更新中间计算值
public void accumulate(WeightedAvgAccum acc, long iValue) {
acc.sum += iValue;
acc.count += 1;
}
// Restract撤回操作,和accumulate操作相反
public void retract(WeightedAvgAccum acc, long iValue) {
acc.sum -= iValue;
acc.count -= 1;
}
// 合并多个accumulator值
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
// 重置中间计算值
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}

使用示例

CREATE FUNCTION udaf_test AS 'com.huawei.udf.UdfAggFunction';
INSERT INTO sink_stream SELECT udaf_test(attr2) FROM source_stream GROUP BY attr1;
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区