直接使用带Scalar UDF的DF示例
场景描述
在大数据处理场景中,用户在使用DataFrame进行数据处理时,常常需要通过用户自定义函数(UDF)来实现复杂的数据计算逻辑。然而,当前系统中UDF的注册与调用耦合在一起,用户无法在注册后单独查看或删除已注册的UDF,这在团队协作开发或动态管理UDF时带来了诸多不便。为了解决这一问题,本次需求通过新增Backend.udf系列API,支持用户在运行时动态查看、调用和删除UDF,从而提升UDF管理的灵活性和开发效率。
约束限制
UDF直接调用/查看/删除功能约束限制如下:
用户必须先获得Backend(Fabric)连接,才能调用Backend UDF Registery API。
对于负责类型的支持依赖fabric内核对复杂类型的支持。
import ibis
import ibis_fabric as fabric
con = ibis.fabric.connect(...)
# 查看数据库中已有的UDF列表
udfs = con.udf.names(database="your-database")
if "transform_json" in udfs:
# 直接获得UDF,确认数据库中已有transform_json函数
transform_json_udf = con.udf.get(name="transform_json", database="your-database")
# 结合DataFrame的SELECT方法,使用transform_json
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# 删除UDF
con.udf.unregister("transform_json", database="your-database")
if "SPManager" in udfs:
# 直接获得UDF,确认数据库中已有SPManager类
sentencepiece_udf = con.udf.get(name="SPManager", database="your-database")
# 结合DataFrame的SELECT方法,使用SPManager
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()
# 删除UDF
con.udf.unregister("SPManager", database="your-database")
完整的Scalar UDF操作语法参见Scalar UDF直接操作语法。