带Scalar UDF的DF示例
场景描述
在AI数据工程中,数据预处理是一个关键步骤,通常需要对存储在数据库中的数据进行复杂的清洗、转换和特征工程操作。然而,传统的数据预处理逻辑往往在数据库外部通过Python脚本实现,这会导致大量数据在数据库和Python环境之间传输,不仅增加了计算开销,还无法充分利用数据库的分布式计算能力。通过在数据库内核中实现Python UDF,并在ibis-fabric中增加Python UDF显式,隐式调用接口,可以将数据预处理逻辑直接嵌入到数据库查询中,减少数据传输,提升整体处理效率。
对于复杂业务逻辑,例如使用机器学习模型对特征数据表进行按行过滤。传统函数式UDF定义方式无法有效支持模型初始化和资源管理需求,因为模型超参数和资源(如模型文件加载)需要在函数执行前一次性配置,而函数式UDF无法高效管理此类初始化逻辑和资源释放操作。通过以Class形式定义Python UDF,用户可以在__init__方法中配置模型超参数和初始化资源(如加载模型),并在process方法中执行推理逻辑,同时通过__del__方法释放资源(如关闭文件或连接),从而实现高效的资源管理和业务逻辑处理。
约束限制
- Python UDF功能约束限制如下:
- 仅支持Python语言编写的用户自定义函数。
- Python环境依赖管理由用户自行负责,需确保各依赖库版本兼容。
- 运行时环境要求Python版本为3.11。
- Python Class UDF功能约束限制如下:
- 类必须定义__init__方法,且其参数名称需与调用时的参数名称保持一致。
- 类必须定义process成员方法,该方法为UDF的主函数入口。
- 类可按需定义__del__成员方法,且该方法不支持传入参数。
- 调用class型UDF时,__init__方法的参数仅支持传入常量,不支持传入表的数据列或其他表达式。
结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。
经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,示例如下:
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
def transform_json(ts: float, msg: str) -> str:
import json
from car_bu.parser_core import ParseObjects, dict_to_object
if msg == '0_msg':
return json.dumps({"time_stamp": 0.0, "msg": {}})
else:
d = dict_to_object(json.loads(msg))
return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})
con = ibis.fabric.connect(...)
# 显式注册transform_json
transform_json_udf = con.udf.python.register(
transform_json,
database="your-database",
imports=['car_bu/parser_core.py'],
packages=['json'],
register_type=RegisterType.OBS
)
# 结合DataFrame的SELECT方法,第一次使用transform_json
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# 结合DataFrame的SELECT方法,第二次使用transform_json
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()
对于Python类的Scalar Class UDF,使用方式和Python函数的Scalar UDF稍有不同:UDF算子接受process方法参数;with_arguments接受__init__方法参数。完整示例如下:
import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence
class Base(abc.ABC):
def __init__(self, *args, **kwargs): pass
@abc.abstractmethod
def process(self, *args, **kwargs): ...
def __del__(self): pass
class SPManager(Base):
def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
sp = SentencePieceProcessor(model_file=model_file)
opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
sp.set_encode_extra_options(opts)
self.spm = sp
def process(self, input_text: str) -> Sequence[str]:
return self.spm.encode(input_text, out_type=str)
con = ibis.fabric.connect(...)
# 显式注册SPManager
sentencepiece_udf = con.udf.python.register(
SPManager,
database="your-database",
imports=['test_model.model'],
packages=['sentencepiece'],
register_type=RegisterType.OBS
)
# 结合DataFrame的SELECT方法,使用SPManager
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()