DataFrame APIs
本章节提供了类Pandas的Python DataFrame SDK,方便用户使用Python编写数据处理作业;同时利用DataArts Fabric SQL内核高效的计算能力,为数据科学家、AI工程师等提供了易用、高效的数据处理能力。
本特性基于Ibis Python DataFrame开源框架实现,将Ibis前端框架与DataArts Fabric SQL引擎对接。用户可基于熟悉的Ibis DataFrame API编写数据处理脚本,Ibis框架将Python API翻译为DataArts Fabric SQL引擎可执行的SQL语句并下发,从而实现计算逻辑在DataArts Fabric SQL引擎的高效处理。支持ibis的api使用,具体ibis api请参见Ibis官方文档。
快速开始
以下代码使用ibis库连接DataArts Fabric数据湖并执行数据查询,将结果转换为DataFrame格式的基本语法。
示例仅供参考,请您根据实际情况进行修改。
关于Ibis更详细的用法,请参见Ibis官方文档。
import ibis # 导入ibis依赖
import logging
import os
con = ibis.fabric.connect(
fabric_endpoint=os.getenv("fabric_endpoint"), # 指定服务的区域,区域查询请参见地区和终端节点
fabric_endpoint_id=os.getenv("fabric_endpoint_id"), # 查询endpoint_id
fabric_workspace_id=os.getenv("fabric_workspace_id"), # 获取workspace_id
lf_catalog_name=os.getenv("lf_catalog_name"), # 连接指定的Catalog
lf_instance_id=os.getenv("lf_instance_id"), # LakeFormation服务的实例ID,详情请参见获取LakeFormation实例id
access_key=os.getenv("access_key"), # 获取ak/sk
secret_key=os.getenv("secret_key"),
use_single_cn_mode=True, # 表示是否开启单CN模式
logging_level=logging.INFO, # 设置日志级别
)
con.set_function_staging_workspace(
obs_directory_base=os.getenv("obs_directory_base"), # obs中udf的存储路径
obs_bucket_name=os.getenv("obs_bucket_name"), # obs的名称
obs_server=os.getenv("obs_server"), # obs访问地址,详情请参见终端节点(Endpoint)和访问域名
access_key=os.getenv("access_key"),
secret_key=os.getenv("secret_key")
)
t = con.table("table_name", database="db") # 通过连接到后端获取table表信息,建立表对象
t.select("cola") # 查询表字段
df = t.execute() # 将DataFrame转为SQL,传输到后端执行,并且返回Pandas DataFrame格式的结果
不带UDF的DF示例
下文以tpch的query1为例,展示DataFrame的用法。
查询SQL为:
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
lineitem
WHERE
l_shipdate <= CAST('1998-09-02' AS date)
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag, l_linestatus;
对应的DataFrame逻辑如下:
import ibis # 导入ibis依赖
con = ibis.fabric.connect(...)
t = con.table("lineitem", database="tpch") # 通过连接到后端获取table表信息,建立表对象
q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90))
discount_price = t.l_extendedprice * (1 - t.l_discount)
charge = discount_price * (1 + t.l_tax)
q = q.group_by(["l_returnflag", "l_linestatus"])
q = q.aggregate(
sum_qty=t.l_quantity.sum(),
sum_base_price=t.l_extendedprice.sum(),
sum_disc_price=discount_price.sum(),
sum_charge=charge.sum(),
avg_qty=t.l_quantity.mean(),
avg_price=t.l_extendedprice.mean(),
avg_disc=t.l_discount.mean(),
count_order=lambda t: t.count(),
)
q = q.order_by(["l_returnflag", "l_linestatus"])
sql = q.compile() # 将DataFrame编译为sql字符串df = q.execute() # 执行表达式并且返回结果集
带Scalar UDF的DF示例
场景描述
在AI数据工程中,数据预处理是一个关键步骤,通常需要对存储在数据库中的数据进行复杂的清洗、转换和特征工程操作。然而,传统的数据预处理逻辑往往在数据库外部通过Python脚本实现,这会导致大量数据在数据库和Python环境之间传输,不仅增加了计算开销,还无法充分利用数据库的分布式计算能力。通过在数据库内核中实现Python UDF,并在ibis-fabric中增加Python UDF显式,隐式调用接口,可以将数据预处理逻辑直接嵌入到数据库查询中,减少数据传输,提升整体处理效率。
对于复杂业务逻辑,例如使用机器学习模型对特征数据表进行按行过滤。传统函数式UDF定义方式无法有效支持模型初始化和资源管理需求,因为模型超参数和资源(如模型文件加载)需要在函数执行前一次性配置,而函数式UDF无法高效管理此类初始化逻辑和资源释放操作。通过以Class形式定义Python UDF,用户可以在__init__方法中配置模型超参数和初始化资源(如加载模型),并在__call__方法中执行推理逻辑,同时通过__del__方法释放资源(如关闭文件或连接),从而实现高效的资源管理和业务逻辑处理。
约束限制
- Python UDF功能约束限制如下:
- 仅支持Python语言编写的用户自定义函数。
- Python环境依赖管理由用户自行负责,需确保各依赖库版本兼容。
- 运行时环境要求Python版本为3.11。
- Python Class UDF功能约束限制如下:
- 类必须定义__init__方法,且其参数名称需与调用时的参数名称保持一致。
- 类必须定义__call__成员方法,该方法为UDF的主函数入口。
- 类可按需定义__del__成员方法,且该方法不支持传入参数。
- 调用class型UDF时,__init__方法的参数仅支持传入常量,不支持传入表的数据列或其他表达式。
结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。
经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,示例如下:
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
def transform_json(ts: float, msg: str) -> str:
import json
from car_bu.parser_core import ParseObjects, dict_to_object
if msg == '0_msg':
return json.dumps({"time_stamp": 0.0, "msg": {}})
else:
d = dict_to_object(json.loads(msg))
return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})
con = ibis.fabric.connect(...)
# 显式注册transform_json
transform_json_udf = con.udf.python.register(
transform_json,
database="your-database",
imports=['car_bu/parser_core.py'],
packages=['json'],
register_type=RegisterType.OBS
)
# 结合DataFrame的SELECT方法,第一次使用transform_json
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# 结合DataFrame的SELECT方法,第二次使用transform_json
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()
对于Python类的Scalar Class UDF,使用方式和Python函数的Scalar UDF稍有不同:UDF算子接受__call__方法参数;with_arguments接受__init__方法参数。完整示例如下:
import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence
class SPManager(Base):
def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
sp = SentencePieceProcessor(model_file=model_file)
opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
sp.set_encode_extra_options(opts)
self.spm = sp
def __call__(self, input_text: str) -> Sequence[str]:
return self.spm.encode(input_text, out_type=str)
con = ibis.fabric.connect(...)
# 显式注册SPManager
sentencepiece_udf = con.udf.python.register(
SPManager,
database="your-database",
imports=['test_model.model'],
packages=['sentencepiece'],
register_type=RegisterType.STAGED
)
# 结合DataFrame的SELECT方法,使用SPManager
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()
带UDAF的DF示例
结合DataFrame使用UDAF是推荐的标准用法,此时整个UDAF的外部必须要包围DataFrame的aggregate方法。
UDAF只能出现在SELECT、ORDER BY、HAVING 3种从句中。
经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,with_arguments接受__init__方法参数,示例如下:
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
import typing
import json
class PythonTopK:
def __init__(self, k: int):
# 用户在 with_arguments 里传入的参数
self.k = k
self._agg_state = []
@property
def aggregate_state(self) -> typing.Sequence[int]:
# 聚合中间态(当前 top k 列表)
return self._agg_state
def accumulate(self, value: int) -> None:
# 累积阶段:逐行更新 top k
if value is not None:
self._agg_state.append(value)
self._agg_state.sort(reverse=True)
self._agg_state = self._agg_state[: self.k]
def merge(self, other_state: typing.Sequence[int]) -> None:
# 合并阶段:跨分区合并 top k
merged = self._agg_state + other_state
merged.sort(reverse=True)
self._agg_state = merged[: self.k]
def finish(self) -> str:
# 收尾阶段:返回最终 top k 结果,以JSON字符串形式返回
return json.dumps(self._agg_state)
con = ibis.fabric.connect(...)
# 显式注册PythonTopK
topk_udf = con.udf.python.register(
PythonTopK,
packages=["json"],
database="your-database",
register_type=RegisterType.STAGED
)
t = con.table("your-table", database="your-database")
# 结合DataFrame的aggregate方法,第一次使用PythonTopK,UDAF在SELECT
expression = t.aggregate(
top_values=topk_udaf(t.amount).with_arguments(k=3),
by=[t.category],
)
df = expression.execute()
# 结合DataFrame的aggregate方法,第二次使用PythonTopK,UDAF在ORDER BY
g = t.aggregate(
sort_key=topk_udaf(t.amount).with_arguments(k=10),
by=[t.category],
)
expression = g.order_by(g.sort_key.desc())
df = expression.execute()
# 结合DataFrame的aggregate方法,第三次使用PythonTopK,UDAF在HAVING
expression = t.group_by(t.category)
.aggregate(top_values=topk_udaf(t.amount).with_arguments(k=5))
.filter(lambda s: s.top_values.isnull() == False)
df = expression.execute()
直接使用带Scalar UDF的DF示例
场景描述
在大数据处理场景中,用户在使用DataFrame进行数据处理时,常常需要通过用户自定义函数(UDF)来实现复杂的数据计算逻辑。然而,当前系统中UDF的注册与调用耦合在一起,用户无法在注册后单独查看或删除已注册的UDF,这在团队协作开发或动态管理UDF时带来了诸多不便。为了解决这一问题,本次需求通过新增Backend.udf系列API,支持用户在运行时动态查看、调用和删除UDF,从而提升UDF管理的灵活性和开发效率。
约束限制
UDF直接调用/查看/删除功能约束限制如下:
用户必须先获得Backend(Fabric)连接,才能调用Backend UDF Registry API。
对于负责类型的支持依赖fabric内核对复杂类型的支持。
import ibis
import fabric_data as fabric
con = ibis.fabric.connect(...)
# 查看数据库中已有的UDF列表
udfs = con.udf.names(database="your-database")
if "transform_json" in udfs:
# 直接获得UDF,确认数据库中已有transform_json函数
transform_json_udf = con.udf.get(name="transform_json", database="your-database")
# 结合DataFrame的SELECT方法,使用transform_json
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# 删除UDF
con.udf.unregister("transform_json", database="your-database")
if "SPManager" in udfs:
# 直接获得UDF,确认数据库中已有SPManager类
sentencepiece_udf = con.udf.get(name="SPManager", database="your-database")
# 结合DataFrame的SELECT方法,使用SPManager
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()
# 删除UDF
con.udf.unregister("SPManager", database="your-database")
直接使用带UDAF的DF示例
import ibis
import fabric_data as fabric
con = ibis.fabric.connect(...)
# 查看数据库中已有的UDAF列表
udfs = con.udaf.names(database="your-database")
if "pythontopk" in udfs:
# 直接获得UDAF,确认数据库中已有pythontopk函数
topk_udaf = con.udaf.get(name="pythontopk", database="your-database")
# 结合DataFrame的aggregate方法,使用PythonTopK
expression = t.aggregate(
top_values=topk_udaf(t.amount).with_arguments(k=3),
by=[t.category],
)
df = expression.execute()
# 删除UDF
con.udaf.unregister("pythontopk", database="your-database")
DataFrame create_table函数使用说明
create_table用于在DataArts Fabric创建一个表,函数签名如下:
def create_table(
self,
name: str,
obj: Optional[Union[ir.Table, pd.DataFrame, pa.Table, pl.DataFrame, pl.LazyFrame]] = None,
*,
schema: Optional[ibis.Schema] = None,
database: Optional[str] = None,
temp: bool = False,
external: bool = False,
overwrite: bool = False,
partition_by: Optional[ibis.Schema] = None,
table_properties: Optional[dict] = None,
store: Optional[str] = None,
location: Optional[str] = None
)
|
参数名称 |
类型 |
是否必须 |
说明 |
|---|---|---|---|
|
name |
str |
是 |
要创建的表名。 |
|
obj |
ir.Table|pd.DataFrame|pa.Table|pl.DataFrame|pl.LazyFrame |
否 |
用于填充表格的数据;必须至少指定obj或schema之一(当前不支持在创建表的时候插入数据)。 |
|
schema |
sch.SchemaLike |
否 |
要创建的表的架构;必须至少指定obj或schema之一。 |
|
database |
str |
否 |
创建表的数据库的名称;如果未传递,则使用当前数据库。 |
|
temp |
bool |
否 |
是否创建为临时表。默认为False。 |
|
external |
bool |
否 |
是否创建为外表。默认为False。 |
|
overwrite |
bool |
否 |
如果为True,表已存在则替换表。默认为False(当前不支持覆盖表)。 |
|
partition_by |
sch.SchemaLike |
否 |
指定分区列,分区列中出现的列不能出现在表的普通列描述中。 |
|
table_properties |
dict |
否 |
表级别可选参数设置,支持参数范围如下参考表1。 |
|
store |
str |
否 |
表存储格式,支持ORC、PARQUET、HUDI、ICEBERG四种存储格式。 |
|
location |
str |
否 |
表存储路径,必须为合法OBS路径,支持OBS对象桶和并行文件系统。
|