更新时间:2025-12-17 GMT+08:00
分享

DataFrame APIs

本章节提供了类Pandas的Python DataFrame SDK,方便用户使用Python编写数据处理作业;同时利用DataArts Fabric SQL内核高效的计算能力,为数据科学家、AI工程师等提供了易用、高效的数据处理能力。

本特性基于Ibis Python DataFrame开源框架实现,将Ibis前端框架与DataArts Fabric SQL引擎对接。用户可基于熟悉的Ibis DataFrame API编写数据处理脚本,Ibis框架将Python API翻译为DataArts Fabric SQL引擎可执行的SQL语句并下发,从而实现计算逻辑在DataArts Fabric SQL引擎的高效处理。支持ibis的api使用,具体ibis api请参见Ibis官方文档

快速开始

以下代码使用ibis库连接DataArts Fabric数据湖并执行数据查询,将结果转换为DataFrame格式的基本语法。

示例仅供参考,请您根据实际情况进行修改。

关于Ibis更详细的用法,请参见Ibis官方文档

import ibis  # 导入ibis依赖
import logging
import os
con = ibis.fabric.connect(
    fabric_endpoint=os.getenv("fabric_endpoint"), # 指定服务的区域,区域查询请参见地区和终端节点
    fabric_endpoint_id=os.getenv("fabric_endpoint_id"), # 查询endpoint_id
    fabric_workspace_id=os.getenv("fabric_workspace_id"), # 获取workspace_id
    lf_catalog_name=os.getenv("lf_catalog_name"), # 连接指定的Catalog
    lf_instance_id=os.getenv("lf_instance_id"),  # LakeFormation服务的实例ID,详情请参见获取LakeFormation实例id
    access_key=os.getenv("access_key"),  # 获取ak/sk
    secret_key=os.getenv("secret_key"), 
    use_single_cn_mode=True,  # 表示是否开启单CN模式
    logging_level=logging.INFO, # 设置日志级别
)
con.set_function_staging_workspace(
    obs_directory_base=os.getenv("obs_directory_base"), # obs中udf的存储路径
    obs_bucket_name=os.getenv("obs_bucket_name"), # obs的名称
    obs_server=os.getenv("obs_server"), # obs访问地址,详情请参见终端节点(Endpoint)和访问域名
    access_key=os.getenv("access_key"),  
    secret_key=os.getenv("secret_key")
)
t = con.table("table_name", database="db")  # 通过连接到后端获取table表信息,建立表对象
t.select("cola")  # 查询表字段
df = t.execute()  # 将DataFrame转为SQL,传输到后端执行,并且返回Pandas DataFrame格式的结果

不带UDF的DF示例

下文以tpch的query1为例,展示DataFrame的用法。

查询SQL为:

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) AS sum_qty,
    sum(l_extendedprice) AS sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
    avg(l_quantity) AS avg_qty,
    avg(l_extendedprice) AS avg_price,
    avg(l_discount) AS avg_disc,
    count(*) AS count_order
FROM
    lineitem
WHERE
    l_shipdate <= CAST('1998-09-02' AS date)
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,    l_linestatus;

对应的DataFrame逻辑如下:

import ibis  # 导入ibis依赖
con = ibis.fabric.connect(...)
t = con.table("lineitem", database="tpch")  # 通过连接到后端获取table表信息,建立表对象
q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90))
discount_price = t.l_extendedprice * (1 - t.l_discount)
charge = discount_price * (1 + t.l_tax)
q = q.group_by(["l_returnflag", "l_linestatus"])
q = q.aggregate(
    sum_qty=t.l_quantity.sum(),
    sum_base_price=t.l_extendedprice.sum(),
    sum_disc_price=discount_price.sum(),
    sum_charge=charge.sum(),
    avg_qty=t.l_quantity.mean(),
    avg_price=t.l_extendedprice.mean(),
    avg_disc=t.l_discount.mean(),
    count_order=lambda t: t.count(),
)
q = q.order_by(["l_returnflag", "l_linestatus"])
sql = q.compile()  # 将DataFrame编译为sql字符串df = q.execute()  # 执行表达式并且返回结果集

带Scalar UDF的DF示例

场景描述

在AI数据工程中,数据预处理是一个关键步骤,通常需要对存储在数据库中的数据进行复杂的清洗、转换和特征工程操作。然而,传统的数据预处理逻辑往往在数据库外部通过Python脚本实现,这会导致大量数据在数据库和Python环境之间传输,不仅增加了计算开销,还无法充分利用数据库的分布式计算能力。通过在数据库内核中实现Python UDF,并在ibis-fabric中增加Python UDF显式,隐式调用接口,可以将数据预处理逻辑直接嵌入到数据库查询中,减少数据传输,提升整体处理效率。

对于复杂业务逻辑,例如使用机器学习模型对特征数据表进行按行过滤。传统函数式UDF定义方式无法有效支持模型初始化和资源管理需求,因为模型超参数和资源(如模型文件加载)需要在函数执行前一次性配置,而函数式UDF无法高效管理此类初始化逻辑和资源释放操作。通过以Class形式定义Python UDF,用户可以在__init__方法中配置模型超参数和初始化资源(如加载模型),并在__call__方法中执行推理逻辑,同时通过__del__方法释放资源(如关闭文件或连接),从而实现高效的资源管理和业务逻辑处理。

约束限制

  • Python UDF功能约束限制如下:
    • 仅支持Python语言编写的用户自定义函数。
    • Python环境依赖管理由用户自行负责,需确保各依赖库版本兼容。
    • 运行时环境要求Python版本为3.11。
  • Python Class UDF功能约束限制如下:
    • 类必须定义__init__方法,且其参数名称需与调用时的参数名称保持一致。
    • 类必须定义__call__成员方法,该方法为UDF的主函数入口。
    • 类可按需定义__del__成员方法,且该方法不支持传入参数。
    • 调用class型UDF时,__init__方法的参数仅支持传入常量,不支持传入表的数据列或其他表达式。

结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。

经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,示例如下:

import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType

def transform_json(ts: float, msg: str) -> str:
    import json
    from car_bu.parser_core import ParseObjects, dict_to_object
    if msg == '0_msg':
        return json.dumps({"time_stamp": 0.0, "msg": {}})
    else:
        d = dict_to_object(json.loads(msg))
        return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})

con = ibis.fabric.connect(...)

# 显式注册transform_json
transform_json_udf = con.udf.python.register(
    transform_json,
    database="your-database",
    imports=['car_bu/parser_core.py'],
    packages=['json'],
    register_type=RegisterType.OBS
)

# 结合DataFrame的SELECT方法,第一次使用transform_json
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()

# 结合DataFrame的SELECT方法,第二次使用transform_json
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()

对于Python类的Scalar Class UDF,使用方式和Python函数的Scalar UDF稍有不同:UDF算子接受__call__方法参数;with_arguments接受__init__方法参数。完整示例如下:

import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence


class SPManager(Base):
    def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
        sp = SentencePieceProcessor(model_file=model_file)
        opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
        sp.set_encode_extra_options(opts)
        self.spm = sp

    def __call__(self, input_text: str) -> Sequence[str]:
        return self.spm.encode(input_text, out_type=str)

con = ibis.fabric.connect(...)

# 显式注册SPManager
sentencepiece_udf = con.udf.python.register(
    SPManager,
    database="your-database",
    imports=['test_model.model'],
    packages=['sentencepiece'],
    register_type=RegisterType.STAGED
)

# 结合DataFrame的SELECT方法,使用SPManager
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()

带UDAF的DF示例

结合DataFrame使用UDAF是推荐的标准用法,此时整个UDAF的外部必须要包围DataFrame的aggregate方法。

UDAF只能出现在SELECT、ORDER BY、HAVING 3种从句中。

经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,with_arguments接受__init__方法参数,示例如下:

import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType

import typing
import json

class PythonTopK:
    def __init__(self, k: int):
        # 用户在 with_arguments 里传入的参数
        self.k = k
        self._agg_state = []

    @property
    def aggregate_state(self) -> typing.Sequence[int]:
        # 聚合中间态(当前 top k 列表)
        return self._agg_state

    def accumulate(self, value: int) -> None:
        # 累积阶段:逐行更新 top k
        if value is not None:
            self._agg_state.append(value)
            self._agg_state.sort(reverse=True)
            self._agg_state = self._agg_state[: self.k]

    def merge(self, other_state: typing.Sequence[int]) -> None:
        # 合并阶段:跨分区合并 top k
        merged = self._agg_state + other_state
        merged.sort(reverse=True)
        self._agg_state = merged[: self.k]

    def finish(self) -> str:
        # 收尾阶段:返回最终 top k 结果,以JSON字符串形式返回
        return json.dumps(self._agg_state)

con = ibis.fabric.connect(...)

# 显式注册PythonTopK
topk_udf = con.udf.python.register(
    PythonTopK, 
    packages=["json"], 
    database="your-database", 
    register_type=RegisterType.STAGED
)

t = con.table("your-table", database="your-database")

# 结合DataFrame的aggregate方法,第一次使用PythonTopK,UDAF在SELECT
expression = t.aggregate(
    top_values=topk_udaf(t.amount).with_arguments(k=3),
    by=[t.category],
)
df = expression.execute()

# 结合DataFrame的aggregate方法,第二次使用PythonTopK,UDAF在ORDER BY
g = t.aggregate(
    sort_key=topk_udaf(t.amount).with_arguments(k=10),
    by=[t.category],
)
expression = g.order_by(g.sort_key.desc())
df = expression.execute()

# 结合DataFrame的aggregate方法,第三次使用PythonTopK,UDAF在HAVING
expression = t.group_by(t.category)
     .aggregate(top_values=topk_udaf(t.amount).with_arguments(k=5))
     .filter(lambda s: s.top_values.isnull() == False)
df = expression.execute()

直接使用带Scalar UDF的DF示例

场景描述

在大数据处理场景中,用户在使用DataFrame进行数据处理时,常常需要通过用户自定义函数(UDF)来实现复杂的数据计算逻辑。然而,当前系统中UDF的注册与调用耦合在一起,用户无法在注册后单独查看或删除已注册的UDF,这在团队协作开发或动态管理UDF时带来了诸多不便。为了解决这一问题,本次需求通过新增Backend.udf系列API,支持用户在运行时动态查看、调用和删除UDF,从而提升UDF管理的灵活性和开发效率。

约束限制

UDF直接调用/查看/删除功能约束限制如下:

用户必须先获得Backend(Fabric)连接,才能调用Backend UDF Registry API。

对于负责类型的支持依赖fabric内核对复杂类型的支持。

在注册和使用分离的场景下,对于UDF的使用者,允许用户通过Fabric Backend提供的Backend.udf接口直接获得、查看、删除Scalar UDF,示例如下:
import ibis
import fabric_data as fabric

con = ibis.fabric.connect(...)

# 查看数据库中已有的UDF列表
udfs = con.udf.names(database="your-database")

if "transform_json" in udfs:
    # 直接获得UDF,确认数据库中已有transform_json函数
    transform_json_udf = con.udf.get(name="transform_json", database="your-database")
    # 结合DataFrame的SELECT方法,使用transform_json
    expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
    df = expression.execute()
    # 删除UDF
    con.udf.unregister("transform_json", database="your-database")

if "SPManager" in udfs:
    # 直接获得UDF,确认数据库中已有SPManager类
    sentencepiece_udf = con.udf.get(name="SPManager", database="your-database")
    # 结合DataFrame的SELECT方法,使用SPManager
    expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
    df = expression.execute()
    # 删除UDF
    con.udf.unregister("SPManager", database="your-database")

直接使用带UDAF的DF示例

在注册和使用分离的场景下,对于UDAF的使用者,允许用户通过Fabric Backend提供的Backend.udaf接口直接获得、查看、删除UDAF,示例如下:
import ibis
import fabric_data as fabric

con = ibis.fabric.connect(...)

# 查看数据库中已有的UDAF列表
udfs = con.udaf.names(database="your-database")

if "pythontopk" in udfs:
    # 直接获得UDAF,确认数据库中已有pythontopk函数
    topk_udaf = con.udaf.get(name="pythontopk", database="your-database")
    # 结合DataFrame的aggregate方法,使用PythonTopK
    expression = t.aggregate(
        top_values=topk_udaf(t.amount).with_arguments(k=3),
        by=[t.category],
    )
    df = expression.execute()
    # 删除UDF
    con.udaf.unregister("pythontopk", database="your-database")

DataFrame create_table函数使用说明

create_table用于在DataArts Fabric创建一个表,函数签名如下:

def create_table(
    self,
    name: str,
    obj: Optional[Union[ir.Table, pd.DataFrame, pa.Table, pl.DataFrame, pl.LazyFrame]] = None,
    *,
    schema: Optional[ibis.Schema] = None,
    database: Optional[str] = None,
    temp: bool = False,
    external: bool = False,
    overwrite: bool = False,
    partition_by: Optional[ibis.Schema] = None,
    table_properties: Optional[dict] = None,
    store: Optional[str] = None,
    location: Optional[str] = None
)
表1 参数说明

参数名称

类型

是否必须

说明

name

str

要创建的表名。

obj

ir.Table|pd.DataFrame|pa.Table|pl.DataFrame|pl.LazyFrame

用于填充表格的数据;必须至少指定obj或schema之一(当前不支持在创建表的时候插入数据)。

schema

sch.SchemaLike

要创建的表的架构;必须至少指定obj或schema之一。

database

str

创建表的数据库的名称;如果未传递,则使用当前数据库。

temp

bool

是否创建为临时表。默认为False。

external

bool

是否创建为外表。默认为False。

overwrite

bool

如果为True,表已存在则替换表。默认为False(当前不支持覆盖表)。

partition_by

sch.SchemaLike

指定分区列,分区列中出现的列不能出现在表的普通列描述中。

table_properties

dict

表级别可选参数设置,支持参数范围如下参考表1

store

str

表存储格式,支持ORC、PARQUET、HUDI、ICEBERG四种存储格式。

location

str

表存储路径,必须为合法OBS路径,支持OBS对象桶和并行文件系统。

  • 如果该路径为OBS对象桶路径,则该表只读,否则该表支持读写。
  • 如果创建表类型为托管表(Managed Table),则不允许指定表存储路径,表存储路径由系统指定为Schema路径下与表名同名路径,且要求该路径在建表时为空。

相关文档