文档首页/
数智融合计算服务 DataArtsFabric/
开发指南/
数智融合(Data+AI)/
DataArtsFabric DataFrame/
场景实践/
带Scalar UDF的DF示例
更新时间:2025-08-25 GMT+08:00
带Scalar UDF的DF示例
结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。
经过注册后返回的值是DataFrame中的一个UDF算子。
此时,该算子可以被多个DataFrame表达式多次调用,示例如下:
import ibis import ibis_fabric as fabric from ibis_fabric.udf import RegisterType def transform_json(ts: float, msg: str) -> str: import json from car_bu.parser_core import ParseObjects, dict_to_object if msg == '0_msg': return json.dumps({"time_stamp": 0.0, "msg": {}}) else: d = dict_to_object(json.loads(msg)) return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)}) con = ibis.fabric.connect(...) # 显式注册transform_json transform_json_udf = con.udf.python.register( transform_json, database="your-database", imports=['car_bu/parser_core.py'], packages=['json'], register_type=RegisterType.OBS ) # 结合DataFrame的SELECT方法,第一次使用transform_json t = con.table("your-table", database="your-database") expression = t.select(transform_json_udf(t.ts, t.msg).name("json column")) df = expression.execute() # 结合DataFrame的SELECT方法,第二次使用transform_json t = con.table("your-table", database="your-database") filtered = t.filter(...) local_view = filtered.select(...).mutate(...) span_base = local_view.select(...).filter(...) span_part2 = ibis.memtable(...) union_part = span_base.union(span_part2) window = ibis.window(...) final_span = union_part.union(...).order_by(...).select(...) result = final_span.mutate(...).select(...).filter(...).order_by(...) result = result.select(transform_json_udf(result.ts, result.msg).name("json column")) df = result.execute()
父主题: 场景实践