Updated on 2025-12-19 GMT+08:00

Dataset APIs

columns

Description: Retrieves a collection of all column names in a dataset.

Input parameters: None.

Return type: tuple[str, ...].

Example:

>>> from fabric_data.multimodal import ai_lake 
>>> conn = ai_lake.connect( 
...     endpoint='*', 
...     ak='*' 
...     sk='*' 
...     
... 
... ) 
>>> ds = conn.load_dataset("demo_table_10", database='my_schema') 
>>> ds.columns 
('species',
  'island',
  'bill_length_mm',
  'bill_depth_mm',  'flipper_length_mm')

add_column(*exprs, **mutations)

Description: Adds columns to a dataset.

Input parameters:

  • exprs (Union[Sequence[Expr], None]): A sequence of named expressions to be added as columns.
  • mutations (Value): Named expressions passed as keyword arguments.

Return type: Updated dataset with new columns.

Example:

>>> from fabric_data.multimodal import ai_lake 
>>> conn = ai_lake.connect( 
...     endpoint='*', 
...     ak='*' 
...     sk='*' 
...     ... 
... ) 
>>> ds = conn.load_dataset("demo_table_6", database='my_schema') 
>>> ds 
| species | bill_length_mm |
| ------- | -------------- |
| string  | float64        |
| ------- | -------------- |
| Adelie  | 40.0           |
| Adelie  | 41.0           |
| Adelie  | 42.0           |
| Adelie  | 43.0           |

#Add a new column from a per-element expression
>>> ds.add_column(new_col=ds.bill_length_mm + 1)
| species | bill_length_mm | new_col  |
| ------- | -------------- | -------- |
| string  | float64        | float64  |
| ------- | -------------- | -------- |
| Adelie  | 40.0           | 41.0     |
| Adelie  | 41.0           | 42.0     |
| Adelie  | 42.0           | 43.0     |
| Adelie  | 43.0           | 44.0     |

drop_columns(*fields)

Description: Removes columns from a dataset.

Input parameters:

fields (str): Names of the columns to remove.

Return type: Updated dataset after removing the columns.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_12", database='my_schema')
>>> ds
| species | island    | bill_length_mm | bill_depth_mm | flipper_length_mm |
| ------- | --------- | -------------- | ------------- | ----------------- |
| string  | string    | float64        | float64       | int64             |
| ------- | --------- | -------------- | ------------- | ----------------- |
| Adelie  | Torgersen | 40.0           | 22.0          | 180               |
| Adelie  | Torgersen | 41.0           | 19.0          | 183               |
| Adelie  | Torgersen | 42.0           | 20.0          | 190               |
| Adelie  | Torgersen | 43.0           | 23.0          | 185               |

# Drop one or more columns
>>> ds.drop_columns("species")
| island    | bill_length_mm | bill_depth_mm | flipper_length_mm |
| --------- | -------------- | ------------- | ----------------- |
| string    | float64        | float64       | int64             |
| --------- | -------------- | ------------- | ----------------- |
| Torgersen | 40.0           | 22.0          | 180               |
| Torgersen | 41.0           | 19.0          | 183               |
| Torgersen | 42.0           | 20.0          | 190               |
| Torgersen | 43.0           | 23.0          | 185               |

aggregate(metrics=(), /, *, by=(), having=(), **kwargs)

Description: Aggregates the dataset using specified grouping fields and aggregation functions.

Input parameters:

  • metrics (Union[Sequence[AggregateFnBuilder], None]): Aggregation expressions, which can be scalar-generating expressions, including aggregate functions like sum.
  • by (Union[Sequence[Value], None]): Grouping expressions, typically set as a collection of dataset column names.
  • having (Union[Sequence[BooleanValue], None]): Post-aggregation filter. Its data dimensions must match those of metrics, and its output type must be boolean.
  • kwargs: Named aggregation expressions.

Return type: aggregated dataset.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_13", database='my_schema')
>>> ds
| fruit  | price |
| ------ | ----- |
| apple  | 0.50  |
| apple  | 0.50  |
| banana | 0.25  |
| orange | 0.33  |
>>> ds.aggregate(
...     by=["fruit"],
...     total_cost=ds.price.sum(),
...     avg_cost=ds.price.mean(),
...     having=ds.price.sum() < 0.5,
... ).order_by("fruit")
| fruit  | total_cost | avg_cost |
| ------ | ---------- | -------- |
| string | float64    | float64  |
| ------ | ---------- | -------- |
| banana | 0.25       | 0.25     |
| orange | 0.33       | 0.33     |

count()

Description: Calculates the total number of rows in a dataset.

Input parameters: None.

Return type: int.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_11", database='my_schema')
>>> ds
| a     |
| ----- |
| string|
| ----- |
| foo   |
| bar   |
| baz   |
>>> ds.count()
┌─┐
│ 3│
└─┘

execute(*, limit='default', params=None, **kwargs)

Description: Executes expressions.

Input parameters:

  • limit(Union[int, str, None]): Limits the total number of rows returned by the expression execution.
  • params(Union[Mapping[Value, Any], None]): A mapping of scalar parameter expressions to their values.
  • kwargs: Keyword arguments.

Return type: Union[pandas.DataFrame, pandas.Series, Any].

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds.execute()
       a     b
0      1     c
1      1     a
2      2     a
[3 rows x 2 columns]

filter(expr=None, fn=None, on=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)

Description: Filters out rows that do not meet specified conditions.

Input parameters:

  • expr(Union[list[ir.BooleanValue], Sequence[ir.BooleanValue], IfAnyAll, None]): A valid Python expression string.
  • on(Union[str, list[str]]): Column(s) to apply the function to.
  • fn(Union[str, Callable, None]): A callable function or its name.
  • num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
  • num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
  • apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
  • concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
  • fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.

Return type: filtered dataset after removing non-compliant rows.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_3", database='my_schema')
>>> ds = ds.filter(expr=[ds.a < 2])
>>> ds
| a     | b     |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1     | c     |
| 1     | a     |

# Filtered by User-defined functions
# my_schema.id_filter: inputting an int value, returning false if value is larger than 2.
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds = ds.filter(
...     fn='my_schema.id_filter',
...     on='a',
... )
>>> ds
| a     | b     |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1     | c     |
| 1     | a     |

flat_map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)

Description: Applies a given function to each row of the dataset and flattens the results.

Input parameters:

  • fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
  • on (Union[str, list[str], ]): Column(s) to which the function is applied.
  • as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
  • num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
  • num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
  • apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
  • concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
  • fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.

Return type:

Dataset: A dataset containing the new column(s) named as_col.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds.execute()
       a     b     c
0      1     4     1
[1 rows x 3 columns]

>>> ds = ds.flat_map(
...     fn='my_schema.py_generate_series',
...     on=['a', 'b', 'c'],
...     as_col=['outCol'])
>>> ds
| a     | b     | c     | outCol |
| ----- | ----- | ----- | ------ |
| int64 | int64 | int64 | int64  |
| ----- | ----- | ----- | ------ |
| 1     | 4     | 1     | 1      |
| 1     | 4     | 1     | 2      |
| 1     | 4     | 1     | 3      |
| 1     | 4     | 1     | 4      |

# Access table columns using the ds.attribute (dot notation) style
>>> ds = ds.flat_map(
...     fn='my_schema.py_generate_series',
...     on=[ds.a, ds.b, ds.c],
...     as_col='udtf_col')
>>> ds
| a     | b     | c     | udtf_col |
| ----- | ----- | ----- | -------- |
| int64 | int64 | int64 | int64    |
| ----- | ----- | ----- | -------- |
| 1     | 4     | 1     | 1        |
| 1     | 4     | 1     | 2        |
| 1     | 4     | 1     | 3        |
| 1     | 4     | 1     | 4        |

map_batchs(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)

Description: Applies a given function to batch data.

Input parameters:

  • fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
  • on (Union[str, list[str], ]): Column(s) to which the function is applied.
  • as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
  • num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
  • num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
  • apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
  • concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
  • fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.

Return type:

Dataset: A dataset containing the new column(s) named as_col.

groupby(*by, **key_exprs)

Description: Creates a grouped dataset.

Input parameters:

  • by (Union[str, Iterable[str], None]): Grouping expression.
  • key_exprs (Union[str, Iterable[str]]): Named grouping expressions.

Return type: a grouped dataset.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_14", database='my_schema')
>>> ds
| fruit  | price |
| ------ | ----- |
| string | float |
| ------ | ----- |
| apple  | 0.50  |
| apple  | 0.50  |
| banana | 0.25  |
| orange | 0.33  |

join(ds, /, on=(), *, join_type='inner', left_suffix='left_', right_suffix='right_')

Description: Performs a join operation between two datasets.

Input parameters:

  • ds (Dataset): Right-side dataset to join with.
  • on (Union[str, Sequence[Union[str, BooleanColumn, Literal[True], Literal[False], tuple[str, str]]]]): Join condition (see the example for details).
  • join_type (JoinKind): Type of join, such as inner or left.
  • left_suffix (str): Suffix used to rename overlapping columns from the left table (e.g., "left_").
  • right_suffix (str): Suffix used to rename overlapping columns from the right table (e.g., "right_").

Return type: joined dataset.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds1 = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds2 = conn.load_dataset("demo_table_16", database='my_schema')

# Equality inner join on the shared id column. Note the _right suffix added to all overlapping columns from the right # table (in this case only the "id" column).
>>> ds1.join(ds2, "id", join_type="inner")
|   id   |  fruit  |  price  |     color     |
| ------ | ------- | ------- | ------------- |
| int64  | string  | float64 | string        |
| ------ | ------- | ------- | ------------- |
|      2 |   apple |    0.25 | Yellow        |
|      3 |  banana |    0.33 | Orange        |

# Explicit equality join using the default join_type value of "inner". Note join_type there is no _right suffix added # to the movieId column since this is an inner join and the movieId column is part of the join condition.
>>> ds1.join(ds2, ds1.id == ds2.id)
|   id   |  fruit  |  price  |     color     |
|--------|---------|---------|---------------|
| int64  | string  | float64 | string        |
|--------|---------|---------|---------------|
|      2 |   apple |    0.25 | Yellow        |
|      3 |  banana |    0.33 | Orange        |

map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)

Description: Applies a given function to each row of the current dataset.

Input parameters:

  • fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
  • on (Union[str, list[str], ]): Column(s) to which the function is applied.
  • as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
  • num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
  • num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
  • apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
  • concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
  • fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.

Return type: dataset. Returns the dataset with a new column (as_col) containing the results of the applied function.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table", database='my_schema')
>>> ds = ds.map(
...     fn='my_schema.multiply_2',
...     on=['col_a'],
...     as_col='col_c'
... )
>>> ds
| col_a | col_b | col_c |
|-------|-------|-------|
| int64 | string | int64 |
|-------|-------|-------|
|     1 | c     | 2     |
|     1 | a     | 2     |
|     2 | a     | 4     |

order_by(key, descending=False)

Description: Sorts a dataset based on one or more expressions.

Input parameters:

  • key (Union[str, List[str], None]): Expression used for sorting.
  • descending (Union[bool, List[bool]]): Whether to sort in descending order. The default value is False.

Return type: sorted dataset.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds
| a     | b      | c     |
|-------|--------|-------|
| 3     | a      | 4     |
| 2     | B      | 6     |
| 1     | c      | 5     |
| 3     | D      | 7     |

# Sort by b. Default is ascending. Note how capital letters come before lowercase
>>> ds.order_by("b")
| a     | b      | c     |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 2     | B      | 6     |
| 3     | D      | 7     |
| 3     | a      | 4     |
| 1     | c      | 5     |

# Sort in descending order
>>> ds.order_by(ds.b.desc())
| a     | b      | c     |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1     | c      | 5     |
| 3     | a      | 4     |
| 3     | D      | 7     |
| 2     | B      | 6     |

# Sort by multiple columns/expressions
>>> ds.order_by(["a", ds.c.desc()])
| a     | b      | c     |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1     | c      | 5     |
| 2     | B      | 6     |
| 3     | D      | 7     |
| 3     | a      | 4     |

rename_columns(names, /, **substitutions)

Description: Renames columns in a dataset.

Input parameters:

  • names(Mapping[str, str]): A mapping (dictionary) specifying renaming rules as {new-name: old-name}. Columns not included retain their original names.
  • substitutions(str): Explicit column renamings specified as keyword arguments in the format new-name=old-name.

Return type: a new dataset with renamed columns.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds
| a     | b      |
|-------|--------|
| int64 | string |
|-------|--------|
| 1     | c      |
| 1     | a      |
| 2     | a      |
>>> ds.rename_columns({"a": "rename_col"})
| rename_col | b      |
|------------|--------|
| int64      | string |
|------------|--------|
| 1          | c      |
| 1          | a      |
| 2          | a      |

schema()

Description: Returns the schema of a dataset.

Input parameters: None.

Return type: schema object.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_7", database='my_schema')
>>> ds.schema()
ibis.Schema {
  species            string
  island             string
  bill_length_mm     float64
  bill_depth_mm      float64
  flipper_length_mm  int64
}

select_columns(*exprs, **named_exprs)

Description: Selects specified columns from a dataset.

Input parameters:

  • exprs(Union[Value, str, Iterable[Union[Value, str]]]): Column expressions, column name strings, or a list of these.
  • named_exprs(Union[Value, str]): Column expressions using named parameters.

Return type: a dataset containing the selected columns.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_5", database='my_schema')
>>> ds.select_columns("species", "bill_length_mm")
| species    | bill_length_mm |
|------------|----------------|
| string     | float64        |
|------------|----------------|
| Adelie     | 40.0           |
| Adelie     | 41.0           |
| Adelie     | 42.0           |
| Adelie     | 43.0           |

# Selection by zero-indexed column position
>>> ds.select_columns(ds[0], ds[4])
| species    | flipper_length_mm |
|------------|-------------------|
| string     | int64             |
|------------|-------------------|
| Adelie     | 180               |
| Adelie     | 183               |
| Adelie     | 190               |
| Adelie     | 185               |

show(limit=10)

Description: Displays the data generated by the query results.

Input parameters:

limit(int): Limits the number of rows to display. Defaults to 10.

Return type: None.

take(limit)

Description: Returns up to limit rows from the dataset, ideal for data inspection and previewing.

Input parameters:

  • limit (int): Maximum number of rows to return.

Return type: list[dict[str, Any]]: A list containing up to limit rows, with each row represented as a dictionary.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_9", database='my_schema')
>>> ds.take(3)
[{'a': 1}, {'a': 1}, {'a': 2}]

write_iceberg(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)

Description: Writes data to a specified Iceberg table.

Input parameters:

  • target_name (str): Name of the target table.
  • database (str): Database the table belongs to.
  • create_if_not_exists (bool): If True, creates the table if it does not already exist.
  • overwrite (bool): If True, replaces existing data in the table.
  • is_external (bool): If True, treats the table as an external table.
  • schema (optional, any type): Optional parameter defining the table's schema.

Return type: None.

write_orc(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)

Description: Writes data to a specified ORC table.

Input parameters:

  • target_name (str): Name of the target table.
  • database (str): Database the table belongs to.
  • create_if_not_exists (bool): If True, creates the table if it does not already exist.
  • overwrite (bool): If True, replaces existing data in the table.
  • is_external (bool): If True, treats the table as an external table.
  • schema (optional, any type): Optional parameter defining the table's schema.

Return type: None.

write_parquet(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)

Description: Writes data to a specified Parquet table.

Input parameters:

  • target_name (str): Name of the target table.
  • database (str): Database the table belongs to.
  • create_if_not_exists (bool): If True, creates the table if it does not already exist.
  • overwrite (bool): If True, replaces existing data in the table.
  • is_external (bool): If True, treats the table as an external table.
  • schema (optional, any type): Optional parameter defining the table's schema.

Return type: None.

max(on=None)

Description: Calculates the maximum value of specified columns in a dataset.

Input parameters:

on (Union[str, list[str], None]): Column(s) to calculate the maximum for.

Return type: dataset.

min(on=None)

Description: Calculates the minimum value of specified columns in a dataset.

Input parameters:

on (Union[str, list[str], None]): Column(s) to calculate the minimum for.

Return type: dataset.

mean(on=None)

Description: Calculates the mean value of specified columns in a dataset.

Input parameters:

on (Union[str, list[str], None]): Column(s) to calculate the mean for.

Return type: dataset.

unique(col)

Description: Calculates a set of unique values from a specified column in a dataset.

Input parameters:

col (str): Column to extract unique values from.

Return type: list.

limit(n, /, *, offset=0)

Description: Selects rows from the current dataset starting at a given offset.

Input parameters:

  • n (Union[int, None]): Number of rows to include. If None, selects all rows from the offset.
  • offset(int): Number of initial rows to skip.

Return type:

Dataset: N rows starting from the offset.

Example:

>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
...     endpoint='*',
...     ak='*'
...     sk='*'
...     ...
... )
>>> ds = conn.load_dataset("demo_table_4", database='my_schema')
>>> ds
| a     | b      |
|-------|--------|
| int64 | string |
|-------|--------|
| 1     | c      |
| 1     | a      |
| 2     | a      |
>>> ds.limit(2)
| a     | b      |
|-------|--------|
| int64 | string |
|-------|--------|
| 1     | c      |
| 1     | a      |
| 2     | a      |
# use None with offset to slice starting from a particular row
>>> ds.limit(None, offset=1)
| a      | b      |
|--------|--------|
| int64  | string |
|--------|--------|
| 1      | a      |
| 2      | a      |

take_batch(batch_size=20, *, batch_format='default')

Description: Returns up to batch_size rows from a dataset.

Input parameters:

  • batch_size (int): Number of rows in a batch (default is 20).
  • batch_format (Optional[str]): Batch format.
    • default: Returns a pyarrow.Table.
    • pandas: Returns a pandas.DataFrame.
    • numpy: Returns a dictionary of NumPy arrays.

Return type: dataset in the specified format.