更新时间:2024-03-12 GMT+08:00

自定义函数类型推导

操作场景

类型推导包含了验证输入值、派生参数和返回值数据类型。从逻辑角度看,Planner需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为JVM对象。

Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。然而以反射方式提取数据类型并不总是成功的,比如UDTF中常见的Row类型。

由于 Flink 1.11 起引入了新的自定义函数注册接口,使用了新的自定义函数类型推断机制,因此原先1.10 重载 getResultType 声明返回字段类型的方式将不再可用。继续使用会抛出如下异常:

Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types.

目前 Flink 1.12 可以通过使用DataTypeHint 和FunctionHint 注解相关参数、类或方法来支持提取过程。

代码示例

Table(类似于 SQL 标准)是一种强类型的 API,函数的参数和返回类型都必须映射到 Table API 的数据类型,参见Table API数据类型

如果需要更高级的类型推导逻辑,您可以在每个自定义函数中显式重写 getTypeInference( ) 方法。

建议使用注解方式,因为它可使自定义类型推导逻辑保持在受影响位置附近,而在其他位置则保持默认状态。

importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.annotation.FunctionHint;
importorg.apache.flink.table.functions.FunctionContext;
importorg.apache.flink.table.functions.TableFunction;
importorg.apache.flink.types.Row;
publicclassUdfTableFunctionextendsTableFunction<Row>{
    /**
     *初始化操作,可选
     *@paramcontext
     */
    @Override
     public void open(FunctionContextcontext) {  }

    @FunctionHint(output=@DataTypeHint("ROW<s STRING, i INT>"))
    publicvoideval(String str, String split) {
        for (String s: str.split(split)) {
            Row row=new Row(2);
            row.setField(0, s);
            row.setField(1, s.length());
            collect(row);
        }
    }
    /**
    * 可选
    */
   @Override
   public void close() {}
}

使用示例

UDTF支持CROSS JOIN和LEFT JOIN,在使用UDTF时需要带上 LATERAL 和TABLE 两个关键字。

  • CROSS JOIN:对于左表的每一行数据,假设UDTF不产生输出,则这一行不进行输出。
  • LEFT JOIN:对于左表的每一行数据,假设UDTF不产生输出,这一行仍会输出,UDTF相关字段用null填充。
CREATE FUNCTION udtf_test AS 'com.huaweicompany.udf.TableFunction';-- CROSS JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length);-- LEFT JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN
LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;