DataFrame APIs
This section introduces a Pandas-like Python DataFrame SDK, enabling you to write data processing jobs using Python. Leveraging the powerful computational capabilities of the DataArts Fabric SQL kernel, it offers easy-to-use and efficient data processing functions for data scientists and AI engineers.
This feature is built upon the Ibis Python DataFrame open-source framework, integrating the Ibis frontend framework with the DataArts Fabric SQL engine. You can compose data processing scripts using the familiar Ibis DataFrame API. The Ibis framework translates these Python APIs into executable SQL statements for the DataArts Fabric SQL engine, ensuring efficient execution of computational logic within the engine. You can use Ibis APIs. For details, refer to Ibis official documentation.
Getting Started
The code below uses the ibis library to connect to the DataArts Fabric data lake, execute data queries, and convert the results into the basic syntax of the DataFrame format.
The example is for reference only. Modify it as needed.
For more detailed usage of Ibis, refer to official Ibis documentation.
import ibis # Import Ibis dependencies.
import logging
import os
con = ibis.fabric.connect(
fabric_endpoint=os.getenv("fabric_endpoint"), # Specify the service region. For details, see Regions and Endpoints.
fabric_endpoint_id=os.getenv("fabric_endpoint_id"), # See Obtaining an Endpoint ID.
fabric_workspace_id=os.getenv("fabric_workspace_id"), # See Obtaining a Workspace ID.
lf_catalog_name=os.getenv("lf_catalog_name"), # Connect to a specified catalog.
lf_instance_id=os.getenv("lf_instance_id"), # LakeFormation instance ID. For details, see Obtaining a LakeFormation Instance ID.
access_key=os.getenv("access_key"), # See Obtaining an AK/SK.
secret_key=os.getenv("secret_key"),
use_single_cn_mode=True, # Whether single CN mode is enabled.
logging_level=logging.INFO, # Set the log level.
)
con.set_function_staging_workspace(
obs_directory_base=os.getenv("obs_directory_base"), # Storage path for UDFs in OBS.
obs_bucket_name=os.getenv("obs_bucket_name"), # OBS bucket name.
obs_server=os.getenv("obs_server"), # OBS access address. For details, see Endpoints and Domain Names.
access_key=os.getenv("access_key"),
secret_key=os.getenv("secret_key")
)
t = con.table("table_name", database="db") # Retrieve table information by connecting to the backend and create a table object.
t.select("cola") # Query table columns.
df = t.execute() # Convert DataFrame to SQL, send it to the backend for execution, and return the result in Pandas DataFrame format.
Examples of DataFrames Without UDFs
Below is an example using query1 from tpch to demonstrate the usage of DataFrame.
The query SQL is as follows:
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
lineitem
WHERE
l_shipdate <= CAST('1998-09-02' AS date)
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag, l_linestatus;
The corresponding DataFrame logic is as follows:
import ibis # Import Ibis dependencies.
con = ibis.fabric.connect(...)
t = con.table("lineitem", database="tpch") # Retrieve table information by connecting to the backend and create a table object.
q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90))
discount_price = t.l_extendedprice * (1 - t.l_discount)
charge = discount_price * (1 + t.l_tax)
q = q.group_by(["l_returnflag", "l_linestatus"])
q = q.aggregate(
sum_qty=t.l_quantity.sum(),
sum_base_price=t.l_extendedprice.sum(),
sum_disc_price=discount_price.sum(),
sum_charge=charge.sum(),
avg_qty=t.l_quantity.mean(),
avg_price=t.l_extendedprice.mean(),
avg_disc=t.l_discount.mean(),
count_order=lambda t: t.count(),
)
q = q.order_by(["l_returnflag", "l_linestatus"])
sql = q.compile() # Compile the DataFrame into a SQL string df = q.execute() # Execute the expression and return the result set
Examples of DataFrames with Scalar UDFs
Scenario
In AI data engineering, data preprocessing is a crucial step, typically involving complex cleansing, transformation, and feature engineering operations on data stored in databases. However, traditional data preprocessing logic is often implemented externally through Python scripts, leading to significant data transfer between the database and the Python environment. This not only increases computational overhead but also fails to fully utilize the distributed computing capabilities of the database. By implementing Python UDFs within the database kernel and enhancing ibis-fabric with both explicit and implicit Python UDF invocation interfaces, we can embed data preprocessing logic directly into database queries, thereby reducing data transfer and boosting overall processing efficiency.
For complex service logics, such as filtering feature data tables row-by-row using machine learning models, traditional functional UDF definitions fall short in supporting model initialization and resource management needs. Since model hyperparameters and resources (for example, model file loading) require one-time configuration prior to function execution, functional UDFs struggle to efficiently handle these initialization logics and resource deallocation operations. Defining Python UDFs in a class format allows users to configure model hyperparameters and initialize resources (like loading a model) in the __init__ method, execute inference logic in the __call__ method, and release resources (such as closing files or connections) via the __del__ method, thus achieving effective resource management and service logic processing.
Constraints
- Constraints on Python UDFs include:
- Only user-defined functions written in Python are supported.
- Users are responsible for managing Python environment dependencies. Compatibility among library versions must be ensured.
- The Python version must be 3.11 in the runtime environment.
- Constraints on Python class UDFs include:
- Classes must define an __init__ method whose parameter names match those used during calls.
- A __call__ member method must be defined within the class, serving as the primary entry point for the UDF.
- A __del__ member method can be defined within a class as needed, and this method does not support input parameters.
- When a class UDF is called, only constant values can be passed to the __init__ method—data columns or other expressions are unsupported.
Using scalar UDF in combination with DataFrame is the recommended standard approach. In this case, the entire Scalar UDF must encompass the SELECT method of DataFrame.
Once registered, the scalar UDF returns a UDF operator within the DataFrame. This operator can then be invoked multiple times by various DataFrame expressions, as shown below:
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
def transform_json(ts: float, msg: str) -> str:
import json
from car_bu.parser_core import ParseObjects, dict_to_object
if msg == '0_msg':
return json.dumps({"time_stamp": 0.0, "msg": {}})
else:
d = dict_to_object(json.loads(msg))
return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})
con = ibis.fabric.connect(...)
# Explicitly register transform_json.
transform_json_udf = con.udf.python.register(
transform_json,
database="your-database",
imports=['car_bu/parser_core.py'],
packages=['json'],
register_type=RegisterType.OBS
)
# First use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# Second use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()
Usage differs slightly between scalar class UDFs of Python classes and scalar UDFs of Python functions: the UDF operator accepts arguments for the __call__ method whereas with_arguments takes parameters intended for the __init__. A complete example is as follows:
import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence
class SPManager(Base):
def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
sp = SentencePieceProcessor(model_file=model_file)
opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
sp.set_encode_extra_options(opts)
self.spm = sp
def __call__(self, input_text: str) -> Sequence[str]:
return self.spm.encode(input_text, out_type=str)
con = ibis.fabric.connect(...)
# Explicitly register SPManager.
sentencepiece_udf = con.udf.python.register(
SPManager,
database="your-database",
imports=['test_model.model'],
packages=['sentencepiece'],
register_type=RegisterType.STAGED
)
# Use SPManager with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()
Example of Using DataFrame with UDAFs
The recommended standard practice for using user-defined aggregate functions (UDAFs) involves integrating them within a DataFrame's aggregate method. This ensures that the UDAF is properly encapsulated.
UDAFs can only be utilized in three specific clauses: SELECT, ORDER BY, and HAVING.
Once registered, the UDAF returns a UDF operator within the DataFrame. This operator can then be invoked multiple times across various DataFrame expressions. The with_arguments method accepts parameters defined in the __init__ function, as demonstrated below:
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
import typing
import json
class PythonTopK:
def __init__(self, k: int):
# Parameters passed via with_arguments
self.k = k
self._agg_state = []
@property
def aggregate_state(self) -> typing.Sequence[int]:
# Intermediate aggregation state (current top-k list)
return self._agg_state
def accumulate(self, value: int) -> None:
# Accumulation phase: Update top-k row by row
if value is not None:
self._agg_state.append(value)
self._agg_state.sort(reverse=True)
self._agg_state = self._agg_state[: self.k]
def merge(self, other_state: typing.Sequence[int]) -> None:
# Merge phase: Combine top-k across partitions
merged = self._agg_state + other_state
merged.sort(reverse=True)
self._agg_state = merged[: self.k]
def finish(self) -> str:
# Finalization phase: Return the final top-k result as a JSON string
return json.dumps(self._agg_state)
con = ibis.fabric.connect(...)
# Explicitly register PythonTopK
topk_udf = con.udf.python.register(
PythonTopK,
packages=["json"],
database="your-database",
register_type=RegisterType.STAGED
)
t = con.table("your-table", database="your-database")
# Use PythonTopK for the first time combined with DataFrame's aggregate method, UDAF in SELECT
expression = t.aggregate(
top_values=topk_udaf(t.amount).with_arguments(k=3),
by=[t.category],
)
df = expression.execute()
# Use PythonTopK for the second time combined with DataFrame's aggregate method, UDAF in ORDER BY
g = t.aggregate(
sort_key=topk_udaf(t.amount).with_arguments(k=10),
by=[t.category],
)
expression = g.order_by(g.sort_key.desc())
df = expression.execute()
# Use PythonTopK for the third time combined with DataFrame's aggregate method, UDAF in HAVING
expression = t.group_by(t.category)
.aggregate(top_values=topk_udaf(t.amount).with_arguments(k=5))
.filter(lambda s: s.top_values.isnull() == False)
df = expression.execute()
Example of Directly Using DataFrame with Scalar UDFs
Scenario
In big data processing scenarios, when you use DataFrame for data processing, you often need to use user-defined functions (UDFs) to achieve complex data computation logic. However, in the current system, UDF registration and invocation are tightly coupled, preventing you from independently viewing or deleting registered UDFs post-registration. This creates numerous inconveniences during collaborative development or dynamic management of UDFs within teams. To address this issue, this requirement introduces new Backend.udf series APIs, enabling users to dynamically view, call, and delete UDFs at runtime, thereby enhancing UDF management flexibility and development efficiency.
Constraints
Constraints on directly calling, viewing, and deleting UDFs are as follows:
You must first establish a Backend (Fabric) connection before calling the Backend UDF Registry API.
Support for specific types relies on DataArts Fabric kernel's support for complex types.
import ibis
import fabric_data as fabric
con = ibis.fabric.connect(...)
# View the list of existing UDFs in the database.
udfs = con.udf.names(database="your-database")
if "transform_json" in udfs:
# Directly acquire the UDF and confirm that the transform_json function already exists in the database.
transform_json_udf = con.udf.get(name="transform_json", database="your-database")
# Use transform_json with the SELECT method of DataFrame.
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()
# Delete a UDF.
con.udf.unregister("transform_json", database="your-database")
if "SPManager" in udfs:
# Directly acquire the UDF and confirm that the SPManager class already exists in the database.
sentencepiece_udf = con.udf.get(name="SPManager", database="your-database")
# Use SPManager with the SELECT method of DataFrame.
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()
# Delete a UDF.
con.udf.unregister("SPManager", database="your-database")
Example of Directly Using DataFrame with UDAFs
import ibis
import fabric_data as fabric
con = ibis.fabric.connect(...)
# View the list of existing UDAFs in the database.
udfs = con.udaf.names(database="your-database")
if "pythontopk" in udfs:
# Directly acquire the UDAF and confirm that the pythontopk function already exists in the database.
topk_udaf = con.udaf.get(name="pythontopk", database="your-database")
# Use PythonTopK with the aggregate method of DataFrame.
expression = t.aggregate(
top_values=topk_udaf(t.amount).with_arguments(k=3),
by=[t.category],
)
df = expression.execute()
# Delete a UDF.
con.udaf.unregister("pythontopk", database="your-database")
DataFrame create_table Function Usage
create_table is used to create a table in DataArts Fabric. The function signature is as follows:
def create_table(
self,
name: str,
obj: Optional[Union[ir.Table, pd.DataFrame, pa.Table, pl.DataFrame, pl.LazyFrame]] = None,
*,
schema: Optional[ibis.Schema] = None,
database: Optional[str] = None,
temp: bool = False,
external: bool = False,
overwrite: bool = False,
partition_by: Optional[ibis.Schema] = None,
table_properties: Optional[dict] = None,
store: Optional[str] = None,
location: Optional[str] = None
)
|
Parameter |
Type |
Mandatory |
Description |
|---|---|---|---|
|
name |
str |
Yes |
Name of the table to be created. |
|
obj |
ir.Table|pd.DataFrame|pa.Table|pl.DataFrame|pl.LazyFrame |
No |
Data used to fill in the table. At least one of obj or schema must be specified (currently, inserting data while creating a table is not supported). |
|
schema |
sch.SchemaLike |
No |
Schema of the table to be created. At least one of obj or schema must be specified. |
|
database |
str |
No |
Name of the database where the table is created. If left unset, the current database is used. |
|
temp |
bool |
No |
Whether to create a temporary table. The default value is False. |
|
external |
bool |
No |
Whether to create a foreign table. The default value is False. |
|
overwrite |
bool |
No |
If set to True, the table is replaced if it already exists. The default value is False (currently, overwriting tables is not supported). |
|
partition_by |
sch.SchemaLike |
No |
Specifies a partition column; columns appearing in the partition column cannot appear in the regular column descriptions of the table. |
|
table_properties |
dict |
No |
Optional setting parameters at the table level. For details about the supported parameter ranges, see Table 1. |
|
store |
str |
No |
Table storage formats, supporting ORC, PARQUET, HUDI, and ICEBERG. |
|
location |
str |
No |
Table storage path. The value must be a valid OBS path. OBS buckets and parallel file systems are supported.
|
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot