User-Defined Function APIs
UDF显式注册语法
显式注册的含义是用户需要手动在Python代码中侵入式添加注册逻辑代码,需要用户使用backend...register/register_from_file来实现,调用即注册。显式注册依赖于已经获得backend会话对象才能进行。
推荐使用显式注册的场景:如果用户希望明确控制注册时间,允许侵入式添加注册逻辑,或对同一个Backend连接下的UDF注册和使用分离有要求。
一个典型的场景是1个开发团队负责UDF的注册,多个团队负责UDF的使用,注册团队和使用团队之间的Python脚本不互通。
|
UDF类型 |
UDF类型(二级) |
注册类型(三级) |
代码入口 |
参考 |
|---|---|---|---|---|
|
udf, udaf, udtf |
python |
直接注册 |
backend.[udf | udaf | udtf].python.register(<注册函数>, <注册参数>) |
|
|
从文件注册 |
backend.[udf | udaf | udtf].python.register_from_file(<文件路径>, <函数名>, <注册参数>) |
|||
|
builtin |
直接注册 |
backend.[udf | udaf | udtf].builtin.register(<注册函数>, <注册参数>) |
||
|
从文件注册 |
backend.[udf | udaf | udtf].builtin.register_from_file(<文件路径>, <函数名>, <注册参数>) |
|||
|
pyarrow |
直接注册 |
backend.udf.pyarrow.register(<注册函数>, <注册参数>) |
||
|
从文件注册 |
backend.udf.pyarrow.register_from_file(<文件路径>, <函数名>, <注册参数>) |
|||
|
pandas |
直接注册 |
backend.udf.pandas.register(<注册函数>, <注册参数>) |
||
|
从文件注册 |
backend.udf.pandas.register_from_file(<文件路径>, <函数名>, <注册参数>) |
UDF隐式注册语法
隐式注册的含义是依赖Python运行时自动发现并注册UDF。用户不需要在Python代码中侵入式添加注册逻辑代码,而是使用@装饰器修饰原始Python函数,然后在DataFrame中使用被装饰的原始Python函数的标识符,即可完成注册。隐式注册在使用@装饰器时不需要获得backend会话对象,后续在ibis DataFrame中获取backend会话对象。
推荐使用隐式注册的场景:如果用户希望无侵入式地注册UDF,且对同一个Backend连接下的UDF注册和使用分离无要求。
一个典型的场景是1个用户的Python脚本中直接写了整个UDF的注册和使用全流程。
|
UDF类型 |
UDF类型(二级) |
代码入口 |
参考 |
|---|---|---|---|
|
udf, udaf, udtf |
python |
@fabric.[udf | udaf | udtf].python(<注册参数>) |
|
|
builtin |
@fabric.[udf | udaf | udtf].builtin(<注册参数>) |
||
|
pyarrow |
@fabric.udf.pyarrow(<注册参数>) |
||
|
pandas |
@fabric.udf.pandas(<注册参数>) |
对于隐式注册,实际注册动作的发生时间根据DataFrame操作模式Lazy、Eager有所区别。
参考上文提及的ibis官方文档,对于DataFrame的操作分为Eager、Lazy两种模式,由配置项ibis.options.interactive控制,默认为False,即所有的DataFrame都默认是Lazy模式。对于两种DataFrame执行模式,UDF注册的发生时间不同,详情说明如下:
|
ibis.options.interactive |
DataFrame执行模式 |
UDF注册时间 |
UDF使用时间 |
|---|---|---|---|
|
False |
Lazy |
整个DataFrame调用execute方法时 |
整个DataFrame调用execute方法时 |
|
True |
Eager |
第一次在DataFrame中使用 |
每一次在DataFrame中使用 |
Python/Pyarrow/Pandas UDF注册参数
注册Python/Pyarrow/Pandas UDF的作用是将一个原始的Python函数或类注册进数据库中。
不管是显式注册还是隐式注册,不管是Scalar UDF、UDAF、UDTF,对于注册Python、Pyarrow、Pandas类型的UDF,目前都接受用户传入以下参数:
|
注册参数 |
含义 |
类型 |
默认值 |
|---|---|---|---|
|
name |
指定UDF实际数据库中存储名称。 |
str | None |
None |
|
database |
指定UDF所在的LakeFormation数据库。 |
str | None |
None |
|
fn |
指定UDF原始的Python函数。 |
Callable |
None |
|
signature |
指定UDF函数签名和返回值类型。 |
fabric_data.ibis.common.annotations.Signature | None |
None |
|
replace(目前不可用) |
指定UDF是否支持就地修改。 |
bool |
False |
|
temporary(目前不可用) |
指定UDF是否会话级别的生命周期。 |
bool |
False |
|
if_not_exist(目前不可用) |
指定UDF是否跳过已存在报错。 |
bool |
False |
|
strict |
指定UDF是否自动过滤NULL值。 |
bool |
True |
|
volatility |
指定UDF稳定性。 |
VolatilityType.VOLATILE | VolatilityType.STABLE | VolatilityType.IMMUTABLE |
VolatilityType.VOLATILE |
|
runtime_version(目前不可用) |
指定UDF执行的Python版本。 |
str |
sys.version_info |
|
imports |
指定UDF依赖的外部代码文件。 |
List[str] |
None |
|
packages |
指定UDF依赖的Python模块。 |
List[Union[str, module]] |
None |
|
register_type |
指定UDF的注册形式。 |
RegisterType.INLINE | RegisterType.STAGED |
RegisterType.INLINE |
|
comment |
指定UDF的用户注释。 |
str | None |
None |
Python/Pyarrow/Pandas UDF注册参数注意事项
- 对于import参数,只支持用户传入当前Python函数或类所在的.py文件同级目录或子目录下的文件路径。
- 对于fn参数,如果fn不在当前注册UDF的.py文件中,那么需要同时在imports参数中添加fn定义的文件路径,例如:
from process import outer con = ibis.fabric.connect(...) # 注册UDAF udf = con.udaf.python.register( outer(), #从外部引入的fn imports=["process.py"] #为fn添加文件路径 ) - 对于signature参数,目前允许用户传入,用户传入的优先级高于自动推断。当用户不传入时,进入参数/返回值类型自动推断逻辑,详情请参见signature参数的类型推断。
- 对于注册Pyarrow UDF,无论用户是否传入signature参数,都需要依赖PyarrowVector,例如:
import fabric_data as fabric import pyarrow as pa import pyarrow.compute as pc # === 用户传入signature,需要依赖PyarrowVector === def calculate_sum( prices: pa.ChunkedArray, quantities: pa.ChunkedArray, ) -> pa.ChunkedArray: return pc.multiply(prices, quantities) con = ibis.fabric.connect(...) # 注册UDF udf = con.udf.pyarrow.register( fn=calculate_sum signature=fabric.Signature( parameters=[ fabric.Parameter(name="price", annotation=fabric.PyarrowVector[float]), fabric.Parameter(name="quantity", annotation=fabric.PyarrowVector[int]), ], return_annotation=fabric.PyarrowVector[float], ), ) # === 用户不传入signature,也需要依赖PyarrowVector === def calculate_sum( prices: fabric.PyarrowVector[float], quantities: fabric.PyarrowVector[int], ) -> fabric.PyarrowVector[float]: return fabric.PyarrowVector[float](pc.multiply(prices, quantities)) con = ibis.fabric.connect(...) # 注册UDF udf = con.udf.pyarrow.register( fn=calculate_sum ) - 对于注册Pandas UDF,无论用户是否传入signature参数,都需要依赖PandasVector,例如:
import fabric_data as fabric import pandas as pd # === 用户传入signature,需要依赖PandasVector === def calculate_sum( prices: pd.Series, quantities: pd.Series, ) -> pd.Series: return pd.Series(prices * quantities, dtype=pd.Float64Dtype()) con = ibis.fabric.connect(...) # 注册UDF udf = con.udf.pandas.register( fn=calculate_sum signature=fabric.Signature( parameters=[ fabric.Parameter(name="price", annotation=fabric.PandasVector[float]), fabric.Parameter(name="quantity", annotation=fabric.PandasVector[int]), ], return_annotation=fabric.PandasVector[float], ), ) # === 用户不传入signature,也需要依赖PandasVector === def calculate_sum( prices: fabric.PandasVector[float], quantities: fabric.PandasVector[int], ) -> fabric.PandasVector[float]: return fabric.PandasVector[float](prices * quantities, dtype=pd.Float64Dtype()) con = ibis.fabric.connect(...) # 注册UDF udf = con.udf.pandas.register( fn=calculate_sum )
- 对于注册Pyarrow UDF,无论用户是否传入signature参数,都需要依赖PyarrowVector,例如:
- 对于volatility参数,3个枚举类型的含义是:
- VolatilityType.VOLATILE:函数结果可能在任何时候都变化。
- VolatilityType.STABLE:函数对于固定输入其结果在一次扫描里不变。
- VolatilityType.IMMUTABLE:函数对于相同的输入总是输出相同的结果。
volatility参数不会影响函数下推执行,对于IMMUTABLE/STABLE/VOLATILE类型的函数,Python UDF函数都可以下推到DN执行。
- 对于packages参数,如果用户没有指定:
- 注册Pyarrow UDF会自动使用后端环境的pyarrow版本。
- 注册Pandas UDF会自动使用后端环境的pandas版本。
Builtin UDF注册参数
注册Builtin UDF的作用是获得数据库已存在的函数的句柄,无实际注册的操作。
不管是显式注册还是隐式注册,不管是Scalar UDF、UDAF、UDTF,对于注册Builtin类型的UDF,目前都接受用户传入以下参数:
|
注册参数 |
含义 |
类型 |
默认值 |
|---|---|---|---|
|
name |
指定UDF实际数据库存储名称 |
str | None |
None |
|
database |
指定UDF所在的LakeFormation数据库 |
str | None |
None |
|
fn |
指定UDF原始的Python函数 |
Callable |
None |
|
signature |
指定UDF函数签名和返回值类型 |
ibis.common.annotations.Signature | None |
None |
Builtin UDF注册参数注意事项
对于signature参数,目前允许用户传入,用户传入的优先级高于自动推断。当用户不传入时,进入参数/返回值类型自动推断逻辑,详情请参见signature参数的类型推断。
signature参数类型的推断
对于signature参数,允许用户传入参数/返回值类型,也允许用户不传入。
- 如果用户传入signature参数,不需要原始Python函数使用类型注解(type hints)语法,此时可以支持及时操作式的注册UDF。
- 如果用户不传入signature参数,推荐对原始Python函数使用类型注解(type hints)语法,此时不能支持及时操作式的注册UDF。
两种方式的区别总结如下:
|
signature参数 |
含义 |
需要原始Python函数使用类型注解语法 |
支持REPL及时操作 |
|---|---|---|---|
|
用户不传值 |
自动推断(推荐) |
否,但是推荐使用 |
否 |
|
用户传值 |
指定传值 |
否 |
是 |
此处的及时操作指的是读取–求值–输出循环(Read–Eval–Print Loop, REPL),通常见于Python用户交互式终端(Terminal)中。
类型注解(type hints)语法由Python 3.5引入(PEP 484),在函数定义中,类型注解通过在参数名后加冒号(:)和类型,以及在参数列表末尾使用箭头(->)指定返回类型来实现,示例如下:
def greet(name: str) -> str:
return f"Hello, {name}"
from typing import List, Dict, Optional
def process_data(data: List[int]) -> Dict[str, Optional[int]]:
return {"max": max(data) if data else None}
对于Python/Pyarrow/Pandas UDF,注册时要求强数据类型,所有的参数/返回值都需要指定类型。如果用户不能通过原始Python函数的类型注解语法来指明,那么就需要用户主动使用signature参数指定ibis DataType。
对于Builtin UDF,注册时不要求强数据类型(因为数据库中已经注册了该UDF函数)。如果用户不能通过原始Python函数的类型注解语法来指明,那么推荐用户只写出参数名称,不写出类型;如果用户后续用到Builtin UDF的返回值(非Top SELECT UDF),那么需要指明函数返回值类型,必要时需要用户主动使用signature参数指定ibis DataType,如果不需要(Top SELECT UDF)则允许用户不写出函数返回值类型。
对于用户不传入signature参数,依赖自动推断时的总结如下:
|
注册UDF类型 |
参数类型 |
返回值类型 |
|---|---|---|
|
Python/Pyarrow/Pandas UDF |
需要类型注解(type hints)语法指定。 |
需要类型注解(type hints)语法指定。 |
|
Builtin UDF |
可以只写出参数名称,不写出类型。 |
后续使用返回值时需要类型注解(type hints)语法指定,否则不需要。 |
对于用户不传入signature参数,自动推断的情况,底层实现原理是inspect.signature。目前,接受用户传入以下参数/返回值类型:
|
Python类型 |
ibis DataType类型 |
对应DataArts Fabric SQL类型 |
|---|---|---|
|
DataType |
DataType |
- |
|
type(None) |
null |
NULL |
|
bool |
Boolean |
BOOLEAN |
|
bytes |
Binary |
BYTEA |
|
str |
String |
TEXT |
|
numbers.Integral |
Int64 |
BIGINT |
|
numbers.Real |
Float64 |
DOUBLE PRECISION |
|
decimal.Decimal |
Decimal |
DECIMAL |
|
datetime.datetime |
Timestamp |
TIMESTAMP/TIMESTAMPTZ |
|
datetime.date |
Date |
TIMESTAMP |
|
datetime.time |
Time |
TIME |
|
datetime.timedelta |
Interval |
INTERVAL |
|
uuid.UUID |
UUID |
UUID |
|
class |
Struct |
STRUCT |
|
typing.Sequence, typing.Array |
Array |
ARRAY |
|
typing.Mapping, typing.Map |
Map |
HSTORE |
|
fabric_data.PyarrowVector[T] |
T |
T |
|
fabric_data.PandasVector[T] |
T |
T |
注意事项:
- Python内置int类型属于numbers.Integral的子类。
- Python内置float类型属于numbers.Real的子类。
上表中没有列出的Python类型,都是目前暂时不支持的自动转化类型。
对于用户不传入signature参数,同时也没有写出Python类型注解(type hints)语法的参数/返回值,目前自动推断采取如下的方式处理:
|
参数类型 |
生成匹配Pattern |
Pattern效果 |
|---|---|---|
|
POSITIONAL_ONLY, KEYWORD_ONLY, POSITIONAL_OR_KEYWORD |
ValueOf(None) |
免除__signature__.validate。 |
|
VAR_POSITIONAL |
TupleOf(pattern=pattern) |
for-loop执行pattern。 |
|
VAR_KEYWORD |
DictOf(key_pattern=InstanceOf(str), value_pattern=pattern) |
for-loop执行pattern。 |
|
Return |
ValueOf(Unknown) |
提供UnknowScaclar, UnknownColumn作为UDF返回值向上返回。 |
inspect.signature对于参数类型(Parameter.kind)的分类如下:
|
参数类型 |
含义 |
示例代码 |
满足条件的参数 |
|---|---|---|---|
|
POSITIONAL_ONLY |
仅限位置参数。 |
def func(a, /, b): pass |
a |
|
KEYWORD_ONLY |
仅限关键字参数。 |
def func(a, *, b): pass |
b |
|
POSITIONAL_OR_KEYWORD |
位置或关键字参数。 |
def func(a, b): pass |
a, b |
|
VAR_POSITIONAL |
可变位置参数。 |
def func(*args): pass |
args |
|
VAR_KEYWORD |
可变关键字参数。 |
def func(**kwargs): pass |
kwargs |
UDF直接操作语法
在注册和使用分离的场景下,为使用者提供Scalar UDF、UDAF、UDTF的直接操作语法,使用者只需要知道UDF名称(name)、所在的数据库名称(database),就可以直接操作使用UDF。以下操作依赖Backend会话对象的udf属性进行。
signature(name, database=None)
描述:从后端DB中返回UDF的函数签名和返回值类型。
输入参数:
- name(str)- UDF函数名称。
- database(str)- UDF函数所属的数据库名称。
返回值类型:fabric_data.ibis.common.annotations.Signature - 已注册的UDF函数签名和返回值类型。
get(name, database=None)
描述:从后端DB中返回UDF函数。
输入参数:
- name(str)- UDF函数名称。
- database(str)- UDF函数所属的数据库名称。
返回值类型:Callable[..., ibis.expr.types.Value] - 已注册的UDF函数。
names(database=None)
描述:从后端DB中返回所有的UDF函数名称。
输入参数:
- database(str):UDF函数所属的数据库名称。
返回值类型:List[str] - 所有已注册的UDF函数名称。
unregister(name, database=None)
描述:从后端DB中删除指定的UDF函数。
输入参数:
- database(str):UDF函数所属的数据库名称。
返回值类型:None
UDF WITH ARGUMENTS使用语法
不管是使用UDF显式注册语法,UDF隐式注册语法后返回的UDF算子,还是通过UDF直接操作语法返回的已注册UDF,目前都支持用户通过with_arguments方法传入参数,参数总体来说分为两类:
- 特殊含义的参数名,这些参数用来配置UDF运行时的资源、并发和执行上限时间,例如concurrency、timeout、dpu、apu,具体情况可见UDF运行时配置列表。所有的UDF类型目前都支持用户使用with_arguments传入配置参数。
- 一般的参数名,这些参数用来配置UDF初始化时的状态,只有Class UDF、Class UDTF、UDAF这些用户在__init__方法中定义了可选参数的情况下可以传入Scalar标量值。实现一次__init__初始化内部状态,后续多次使用。
|
参数名 |
适用UDF类型 |
含义 |
|---|---|---|
|
特殊含义的参数名,如concurrency、timeout、dpu、apu |
所有UDF类型,包括Scalar UDF、Class UDF、Vectorized UDF、UDTF、UDAF |
配置UDF运行时的资源、并发和执行上限时间。 |
|
一般的参数名,由Python Class的__init__方法定义 |
Class UDF类型,包括Class UDF、Class UDTF、UDAF |
配置UDF初始化状态,适合有内部缓存、初始化参数、模型对象等情况。 |
所有经过with_arguments方法传入的参数值都是Scalar标量,所有参数值整体作为一个**kwargs(Python字典类型)传入。