Updated on 2025-08-25 GMT+08:00

Examples of DataFrames with UDFs

Using Scalar UDF in combination with DataFrame is the recommended standard approach. In this case, the entire Scalar UDF must encompass the SELECT method of DataFrame.

After registration, the returned value is a UDF operator within DataFrame.

This operator can then be invoked multiple times by various DataFrame expressions, as shown below:

import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType

def transform_json(ts: float, msg: str) -> str:
    import json
    from car_bu.parser_core import ParseObjects, dict_to_object
    if msg == '0_msg':
        return json.dumps({"time_stamp": 0.0, "msg": {}})
    else:
        d = dict_to_object(json.loads(msg))
        return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})

con = ibis.fabric.connect(...)

# Explicitly register transform_json.
transform_json_udf = con.udf.python.register(
    transform_json,
    database="your-database",
    imports=['car_bu/parser_core.py'],
    packages=['json'],
    register_type=RegisterType.OBS
)

# First use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()

# Second use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()