直接使用带Scalar UDF的DF示例
场景描述
在大数据处理场景中,用户在使用DataFrame进行数据处理时,常常需要通过用户自定义函数(UDF)来实现复杂的数据计算逻辑。然而,当前系统中UDF的注册与调用耦合在一起,用户无法在注册后单独查看或删除已注册的UDF,这在团队协作开发或动态管理UDF时带来了诸多不便。为了解决这一问题,本次需求通过新增Backend.udf系列API,支持用户在运行时动态查看、调用和删除UDF,从而提升UDF管理的灵活性和开发效率。
约束限制
UDF直接调用/查看/删除功能约束限制如下:
用户必须先获得Backend(Fabric)连接,才能调用Backend UDF Registery API。
对于负责类型的支持依赖fabric内核对复杂类型的支持。
import ibis import ibis_fabric as fabric con = ibis.fabric.connect(...) # 查看数据库中已有的UDF列表 udfs = con.udf.names(database="your-database") if "transform_json" in udfs: # 直接获得UDF,确认数据库中已有transform_json函数 transform_json_udf = con.udf.get(name="transform_json", database="your-database") # 结合DataFrame的SELECT方法,使用transform_json expression = t.select(transform_json_udf(t.ts, t.msg).name("json column")) df = expression.execute() # 删除UDF con.udf.unregister("transform_json", database="your-database") if "SPManager" in udfs: # 直接获得UDF,确认数据库中已有SPManager类 sentencepiece_udf = con.udf.get(name="SPManager", database="your-database") # 结合DataFrame的SELECT方法,使用SPManager expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column")) df = expression.execute() # 删除UDF con.udf.unregister("SPManager", database="your-database")
完整的Scalar UDF操作语法参见Scalar UDF直接操作语法。