Class UDF
Class UDF基于面向对象编程范式实现,主要目的是为了创建有状态的处理逻辑。函数要求实现__init__, __call__等方法。
Class UDF注册
Class UDF注册传入的是Python类,对于__init__、__call__、 __del__ 3个实例方法有特殊的含义认定,详情请参见下表;其他的Python类和实例方法不做限制,用户可以任意添加。
|
Python Class方法 |
是否必须 |
参数 |
含义 |
适用场景 |
|---|---|---|---|---|
|
__init__(self, *args, **kwargs) |
否 |
通过with_arguments方法传入参数,只允许传入标量Scalar。 |
UDF的构造方法。 |
初始化UDF属性(如保存参数、打开文件、建立连接等)。 |
|
__call__(self, *args, **kwargs) |
是 |
通过UDF算子传入参数,可以传入标量Scalar和列名Column。 |
UDF的业务逻辑方法入口。 |
UDF处理数据、执行计算。 |
|
__del__(self) |
否 |
不支持传入参数。 |
UDF的析构方法。 |
UDF资源清理(如关闭文件、断开网络连接等)。 |
示例
import ibis
import fabric_data as fabric
from fabric_data.udf import RegisterType
class Hello:
def __init__(self, firstname: str, lastname: str):
self.firstname = firstname
self.lastname = lastname
def __call__(self, location: str) -> str:
return f"Hello{self.firstname} {self.lastname}! Welcome to {location}"
con = ibis.fabric.connect(...)
# 显式,直接注册UDF
udf = con.udf.python.register(Hello, database="your-database", register_type=RegisterType.STAGED)
# 或者从文件注册UDF
udf = con.udf.python.register_from_file("your-current-file-path", "Hello", database="your-database", register_type=RegisterType.STAGED)
# 使用Class UDF
t = con.table(name="your-table", schema="your-schema")
expression = t.select(udf(t.city).with_arguments(firstname="Ethan", lastname="Carter"))
print(expression.execute())