更新时间:2025-12-17 GMT+08:00
分享

User-Defined Function APIs

UDF显式注册语法

显式注册的含义是用户需要手动在Python代码中侵入式添加注册逻辑代码,需要用户使用backend...register/register_from_file来实现,调用即注册。显式注册依赖于已经获得backend会话对象才能进行。

推荐使用显式注册的场景:如果用户希望明确控制注册时间,允许侵入式添加注册逻辑,或对同一个Backend连接下的UDF注册和使用分离有要求。

一个典型的场景是1个开发团队负责UDF的注册,多个团队负责UDF的使用,注册团队和使用团队之间的Python脚本不互通。

表1 UDF显式注册语法

UDF类型

UDF类型(二级)

注册类型(三级)

代码入口

参考

udf, udaf, udtf

python

直接注册

backend.[udf | udaf | udtf].python.register(<注册函数>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

从文件注册

backend.[udf | udaf | udtf].python.register_from_file(<文件路径>, <函数名>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

builtin

直接注册

backend.[udf | udaf | udtf].builtin.register(<注册函数>, <注册参数>)

Builtin UDF注册参数

从文件注册

backend.[udf | udaf | udtf].builtin.register_from_file(<文件路径>, <函数名>, <注册参数>)

Builtin UDF注册参数

pyarrow

直接注册

backend.udf.pyarrow.register(<注册函数>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

从文件注册

backend.udf.pyarrow.register_from_file(<文件路径>, <函数名>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

pandas

直接注册

backend.udf.pandas.register(<注册函数>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

从文件注册

backend.udf.pandas.register_from_file(<文件路径>, <函数名>, <注册参数>)

Python/Pyarrow/Pandas UDF注册参数

UDF隐式注册语法

隐式注册的含义是依赖Python运行时自动发现并注册UDF。用户不需要在Python代码中侵入式添加注册逻辑代码,而是使用@装饰器修饰原始Python函数,然后在DataFrame中使用被装饰的原始Python函数的标识符,即可完成注册。隐式注册在使用@装饰器时不需要获得backend会话对象,后续在ibis DataFrame中获取backend会话对象。

推荐使用隐式注册的场景:如果用户希望无侵入式地注册UDF,且对同一个Backend连接下的UDF注册和使用分离无要求。

一个典型的场景是1个用户的Python脚本中直接写了整个UDF的注册和使用全流程。

表2 UDF隐式注册语法

UDF类型

UDF类型(二级)

代码入口

参考

udf, udaf, udtf

python

@fabric.[udf | udaf | udtf].python(<注册参数>)

Python/Pyarrow/Pandas UDF注册参数

builtin

@fabric.[udf | udaf | udtf].builtin(<注册参数>)

Builtin UDF注册参数

pyarrow

@fabric.udf.pyarrow(<注册参数>)

Python/Pyarrow/Pandas UDF注册参数

pandas

@fabric.udf.pandas(<注册参数>)

Python/Pyarrow/Pandas UDF注册参数

对于隐式注册,实际注册动作的发生时间根据DataFrame操作模式Lazy、Eager有所区别。

参考上文提及的ibis官方文档,对于DataFrame的操作分为Eager、Lazy两种模式,由配置项ibis.options.interactive控制,默认为False,即所有的DataFrame都默认是Lazy模式。对于两种DataFrame执行模式,UDF注册的发生时间不同,详情说明如下:

表3 DataFrame执行模式

ibis.options.interactive

DataFrame执行模式

UDF注册时间

UDF使用时间

False

Lazy

整个DataFrame调用execute方法时

整个DataFrame调用execute方法时

True

Eager

第一次在DataFrame中使用

每一次在DataFrame中使用

Python/Pyarrow/Pandas UDF注册参数

注册Python/Pyarrow/Pandas UDF的作用是将一个原始的Python函数或类注册进数据库中。

不管是显式注册还是隐式注册,不管是Scalar UDF、UDAF、UDTF,对于注册Python、Pyarrow、Pandas类型的UDF,目前都接受用户传入以下参数:

表4 Python/Pyarrow/Pandas UDF注册参数

注册参数

含义

类型

默认值

name

指定UDF实际数据库中存储名称。

str | None

None

database

指定UDF所在的LakeFormation数据库。

str | None

None

fn

指定UDF原始的Python函数。

Callable

None

signature

指定UDF函数签名和返回值类型。

fabric_data.ibis.common.annotations.Signature | None

None

replace(目前不可用)

指定UDF是否支持就地修改。

bool

False

temporary(目前不可用)

指定UDF是否会话级别的生命周期。

bool

False

if_not_exist(目前不可用)

指定UDF是否跳过已存在报错。

bool

False

strict

指定UDF是否自动过滤NULL值。

bool

True

volatility

指定UDF稳定性。

VolatilityType.VOLATILE | VolatilityType.STABLE | VolatilityType.IMMUTABLE

VolatilityType.VOLATILE

runtime_version(目前不可用)

指定UDF执行的Python版本。

str

sys.version_info

imports

指定UDF依赖的外部代码文件。

List[str]

None

packages

指定UDF依赖的Python模块。

List[Union[str, module]]

None

register_type

指定UDF的注册形式。

RegisterType.INLINE | RegisterType.STAGED

RegisterType.INLINE

comment

指定UDF的用户注释。

str | None

None

Python/Pyarrow/Pandas UDF注册参数注意事项

  • 对于import参数,只支持用户传入当前Python函数或类所在的.py文件同级目录或子目录下的文件路径。
  • 对于fn参数,如果fn不在当前注册UDF的.py文件中,那么需要同时在imports参数中添加fn定义的文件路径,例如:
    from process import outer
    
    con = ibis.fabric.connect(...)
    
    # 注册UDAF
    udf = con.udaf.python.register(
        outer(), #从外部引入的fn
        imports=["process.py"] #为fn添加文件路径
    )
  • 对于signature参数,目前允许用户传入,用户传入的优先级高于自动推断。当用户不传入时,进入参数/返回值类型自动推断逻辑,详情请参见signature参数的类型推断
    • 对于注册Pyarrow UDF,无论用户是否传入signature参数,都需要依赖PyarrowVector,例如:
      import fabric_data as fabric
      import pyarrow as pa
      import pyarrow.compute as pc
      
      # === 用户传入signature,需要依赖PyarrowVector ===
      def calculate_sum(
          prices: pa.ChunkedArray,
          quantities: pa.ChunkedArray,
      ) -> pa.ChunkedArray:
          return pc.multiply(prices, quantities)
      
      con = ibis.fabric.connect(...)
      
      # 注册UDF
      udf = con.udf.pyarrow.register(
          fn=calculate_sum
          signature=fabric.Signature(
              parameters=[
                  fabric.Parameter(name="price", annotation=fabric.PyarrowVector[float]),
                  fabric.Parameter(name="quantity", annotation=fabric.PyarrowVector[int]),
              ],
              return_annotation=fabric.PyarrowVector[float],
          ),
      )
      
      # === 用户不传入signature,也需要依赖PyarrowVector ===
      def calculate_sum(
          prices: fabric.PyarrowVector[float],
          quantities: fabric.PyarrowVector[int],
      ) -> fabric.PyarrowVector[float]:
          return fabric.PyarrowVector[float](pc.multiply(prices, quantities))
      
      con = ibis.fabric.connect(...)
      
      # 注册UDF
      udf = con.udf.pyarrow.register(
          fn=calculate_sum
      )
    • 对于注册Pandas UDF,无论用户是否传入signature参数,都需要依赖PandasVector,例如:
      import fabric_data as fabric
      import pandas as pd
      
      # === 用户传入signature,需要依赖PandasVector ===
      def calculate_sum(
          prices: pd.Series, 
          quantities: pd.Series,
      ) -> pd.Series:
          return pd.Series(prices * quantities, dtype=pd.Float64Dtype())
      
      con = ibis.fabric.connect(...)
      
      # 注册UDF
      udf = con.udf.pandas.register(
          fn=calculate_sum
          signature=fabric.Signature(
              parameters=[
                  fabric.Parameter(name="price", annotation=fabric.PandasVector[float]),
                  fabric.Parameter(name="quantity", annotation=fabric.PandasVector[int]),
              ],
              return_annotation=fabric.PandasVector[float],
          ),
      )
      
      # === 用户不传入signature,也需要依赖PandasVector ===
      def calculate_sum(
          prices: fabric.PandasVector[float], 
          quantities: fabric.PandasVector[int],
      ) -> fabric.PandasVector[float]:
          return fabric.PandasVector[float](prices * quantities, dtype=pd.Float64Dtype())
      
      con = ibis.fabric.connect(...)
      
      # 注册UDF
      udf = con.udf.pandas.register(
          fn=calculate_sum
      )
  • 对于volatility参数,3个枚举类型的含义是:
    • VolatilityType.VOLATILE:函数结果可能在任何时候都变化。
    • VolatilityType.STABLE:函数对于固定输入其结果在一次扫描里不变。
    • VolatilityType.IMMUTABLE:函数对于相同的输入总是输出相同的结果。

    volatility参数不会影响函数下推执行,对于IMMUTABLE/STABLE/VOLATILE类型的函数,Python UDF函数都可以下推到DN执行。

  • 对于packages参数,如果用户没有指定:
    • 注册Pyarrow UDF会自动使用后端环境的pyarrow版本。
    • 注册Pandas UDF会自动使用后端环境的pandas版本。

Builtin UDF注册参数

注册Builtin UDF的作用是获得数据库已存在的函数的句柄,无实际注册的操作。

不管是显式注册还是隐式注册,不管是Scalar UDF、UDAF、UDTF,对于注册Builtin类型的UDF,目前都接受用户传入以下参数:

表5 Builtin UDF注册参数

注册参数

含义

类型

默认值

name

指定UDF实际数据库存储名称

str | None

None

database

指定UDF所在的LakeFormation数据库

str | None

None

fn

指定UDF原始的Python函数

Callable

None

signature

指定UDF函数签名和返回值类型

ibis.common.annotations.Signature | None

None

Builtin UDF注册参数注意事项

对于signature参数,目前允许用户传入,用户传入的优先级高于自动推断。当用户不传入时,进入参数/返回值类型自动推断逻辑,详情请参见signature参数的类型推断

signature参数类型的推断

对于signature参数,允许用户传入参数/返回值类型,也允许用户不传入。

  • 如果用户传入signature参数,不需要原始Python函数使用类型注解(type hints)语法,此时可以支持及时操作式的注册UDF。
  • 如果用户不传入signature参数,推荐对原始Python函数使用类型注解(type hints)语法,此时不能支持及时操作式的注册UDF。

两种方式的区别总结如下:

表6 signature参数说明

signature参数

含义

需要原始Python函数使用类型注解语法

支持REPL及时操作

用户不传值

自动推断(推荐)

否,但是推荐使用

用户传值

指定传值

此处的及时操作指的是读取–求值–输出循环(Read–Eval–Print Loop, REPL),通常见于Python用户交互式终端(Terminal)中。

类型注解(type hints)语法由Python 3.5引入(PEP 484),在函数定义中,类型注解通过在参数名后加冒号(:)和类型,以及在参数列表末尾使用箭头(->)指定返回类型来实现,示例如下:

def greet(name: str) -> str:
    return f"Hello, {name}"
from typing import List, Dict, Optional

def process_data(data: List[int]) -> Dict[str, Optional[int]]:
    return {"max": max(data) if data else None}

对于Python/Pyarrow/Pandas UDF,注册时要求强数据类型,所有的参数/返回值都需要指定类型。如果用户不能通过原始Python函数的类型注解语法来指明,那么就需要用户主动使用signature参数指定ibis DataType。

对于Builtin UDF,注册时不要求强数据类型(因为数据库中已经注册了该UDF函数)。如果用户不能通过原始Python函数的类型注解语法来指明,那么推荐用户只写出参数名称,不写出类型;如果用户后续用到Builtin UDF的返回值(非Top SELECT UDF),那么需要指明函数返回值类型,必要时需要用户主动使用signature参数指定ibis DataType,如果不需要(Top SELECT UDF)则允许用户不写出函数返回值类型。

对于用户不传入signature参数,依赖自动推断时的总结如下:

表7 signature自动推断

注册UDF类型

参数类型

返回值类型

Python/Pyarrow/Pandas UDF

需要类型注解(type hints)语法指定。

需要类型注解(type hints)语法指定。

Builtin UDF

可以只写出参数名称,不写出类型。

后续使用返回值时需要类型注解(type hints)语法指定,否则不需要。

对于用户不传入signature参数,自动推断的情况,底层实现原理是inspect.signature。目前,接受用户传入以下参数/返回值类型:

表8 接受的参数/返回值类型

Python类型

ibis DataType类型

对应DataArts Fabric SQL类型

DataType

DataType

-

type(None)

null

NULL

bool

Boolean

BOOLEAN

bytes

Binary

BYTEA

str

String

TEXT

numbers.Integral

Int64

BIGINT

numbers.Real

Float64

DOUBLE PRECISION

decimal.Decimal

Decimal

DECIMAL

datetime.datetime

Timestamp

TIMESTAMP/TIMESTAMPTZ

datetime.date

Date

TIMESTAMP

datetime.time

Time

TIME

datetime.timedelta

Interval

INTERVAL

uuid.UUID

UUID

UUID

class

Struct

STRUCT

typing.Sequence, typing.Array

Array

ARRAY

typing.Mapping, typing.Map

Map

HSTORE

fabric_data.PyarrowVector[T]

T

T

fabric_data.PandasVector[T]

T

T

注意事项:

  • Python内置int类型属于numbers.Integral的子类。
  • Python内置float类型属于numbers.Real的子类。

上表中没有列出的Python类型,都是目前暂时不支持的自动转化类型。

对于用户不传入signature参数,同时也没有写出Python类型注解(type hints)语法的参数/返回值,目前自动推断采取如下的方式处理:

表9 特殊参数类型处理

参数类型

生成匹配Pattern

Pattern效果

POSITIONAL_ONLY, KEYWORD_ONLY, POSITIONAL_OR_KEYWORD

ValueOf(None)

免除__signature__.validate。

VAR_POSITIONAL

TupleOf(pattern=pattern)

for-loop执行pattern。

VAR_KEYWORD

DictOf(key_pattern=InstanceOf(str), value_pattern=pattern)

for-loop执行pattern。

Return

ValueOf(Unknown)

提供UnknowScaclar, UnknownColumn作为UDF返回值向上返回。

inspect.signature对于参数类型(Parameter.kind)的分类如下:

表10 inspect.signature参数类型

参数类型

含义

示例代码

满足条件的参数

POSITIONAL_ONLY

仅限位置参数。

def func(a, /, b): pass

a

KEYWORD_ONLY

仅限关键字参数。

def func(a, *, b): pass

b

POSITIONAL_OR_KEYWORD

位置或关键字参数。

def func(a, b): pass

a, b

VAR_POSITIONAL

可变位置参数。

def func(*args): pass

args

VAR_KEYWORD

可变关键字参数。

def func(**kwargs): pass

kwargs

UDF直接操作语法

在注册和使用分离的场景下,为使用者提供Scalar UDF、UDAF、UDTF的直接操作语法,使用者只需要知道UDF名称(name)、所在的数据库名称(database),就可以直接操作使用UDF。以下操作依赖Backend会话对象的udf属性进行。

signature(name, database=None)

描述:从后端DB中返回UDF的函数签名和返回值类型。

输入参数:

  • name(str)- UDF函数名称。
  • database(str)- UDF函数所属的数据库名称。

返回值类型:fabric_data.ibis.common.annotations.Signature - 已注册的UDF函数签名和返回值类型。

get(name, database=None)

描述:从后端DB中返回UDF函数。

输入参数:

  • name(str)- UDF函数名称。
  • database(str)- UDF函数所属的数据库名称。

返回值类型:Callable[..., ibis.expr.types.Value] - 已注册的UDF函数。

names(database=None)

描述:从后端DB中返回所有的UDF函数名称。

输入参数:

  • database(str):UDF函数所属的数据库名称。

返回值类型:List[str] - 所有已注册的UDF函数名称。

unregister(name, database=None)

描述:从后端DB中删除指定的UDF函数。

输入参数:

  • database(str):UDF函数所属的数据库名称。

返回值类型:None

UDF WITH ARGUMENTS使用语法

不管是使用UDF显式注册语法UDF隐式注册语法后返回的UDF算子,还是通过UDF直接操作语法返回的已注册UDF,目前都支持用户通过with_arguments方法传入参数,参数总体来说分为两类:

  • 特殊含义的参数名,这些参数用来配置UDF运行时的资源、并发和执行上限时间,例如concurrency、timeout、dpu、apu,具体情况可见UDF运行时配置列表。所有的UDF类型目前都支持用户使用with_arguments传入配置参数。
  • 一般的参数名,这些参数用来配置UDF初始化时的状态,只有Class UDF、Class UDTF、UDAF这些用户在__init__方法中定义了可选参数的情况下可以传入Scalar标量值。实现一次__init__初始化内部状态,后续多次使用。
表11 WITH ARGUMENTS语法适用情况

参数名

适用UDF类型

含义

特殊含义的参数名,如concurrency、timeout、dpu、apu

所有UDF类型,包括Scalar UDF、Class UDF、Vectorized UDF、UDTF、UDAF

配置UDF运行时的资源、并发和执行上限时间。

一般的参数名,由Python Class的__init__方法定义

Class UDF类型,包括Class UDF、Class UDTF、UDAF

配置UDF初始化状态,适合有内部缓存、初始化参数、模型对象等情况。

所有经过with_arguments方法传入的参数值都是Scalar标量,所有参数值整体作为一个**kwargs(Python字典类型)传入。

相关文档