Dataset APIs
columns
Description: Retrieves a collection of all column names in a dataset.
Input parameters: None.
Return type: tuple[str, ...].
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
...
...
... )
>>> ds = conn.load_dataset("demo_table_10", database='my_schema')
>>> ds.columns
('species',
'island',
'bill_length_mm',
'bill_depth_mm', 'flipper_length_mm')
add_column(*exprs, **mutations)
Description: Adds columns to a dataset.
Input parameters:
- exprs (Union[Sequence[Expr], None]): A sequence of named expressions to be added as columns.
- mutations (Value): Named expressions passed as keyword arguments.
Return type: Updated dataset with new columns.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_6", database='my_schema')
>>> ds
| species | bill_length_mm |
| ------- | -------------- |
| string | float64 |
| ------- | -------------- |
| Adelie | 40.0 |
| Adelie | 41.0 |
| Adelie | 42.0 |
| Adelie | 43.0 |
#Add a new column from a per-element expression
>>> ds.add_column(new_col=ds.bill_length_mm + 1)
| species | bill_length_mm | new_col |
| ------- | -------------- | -------- |
| string | float64 | float64 |
| ------- | -------------- | -------- |
| Adelie | 40.0 | 41.0 |
| Adelie | 41.0 | 42.0 |
| Adelie | 42.0 | 43.0 |
| Adelie | 43.0 | 44.0 |
drop_columns(*fields)
Description: Removes columns from a dataset.
Input parameters:
fields (str): Names of the columns to remove.
Return type: Updated dataset after removing the columns.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_12", database='my_schema')
>>> ds
| species | island | bill_length_mm | bill_depth_mm | flipper_length_mm |
| ------- | --------- | -------------- | ------------- | ----------------- |
| string | string | float64 | float64 | int64 |
| ------- | --------- | -------------- | ------------- | ----------------- |
| Adelie | Torgersen | 40.0 | 22.0 | 180 |
| Adelie | Torgersen | 41.0 | 19.0 | 183 |
| Adelie | Torgersen | 42.0 | 20.0 | 190 |
| Adelie | Torgersen | 43.0 | 23.0 | 185 |
# Drop one or more columns
>>> ds.drop_columns("species")
| island | bill_length_mm | bill_depth_mm | flipper_length_mm |
| --------- | -------------- | ------------- | ----------------- |
| string | float64 | float64 | int64 |
| --------- | -------------- | ------------- | ----------------- |
| Torgersen | 40.0 | 22.0 | 180 |
| Torgersen | 41.0 | 19.0 | 183 |
| Torgersen | 42.0 | 20.0 | 190 |
| Torgersen | 43.0 | 23.0 | 185 |
aggregate(metrics=(), /, *, by=(), having=(), **kwargs)
Description: Aggregates the dataset using specified grouping fields and aggregation functions.
Input parameters:
- metrics (Union[Sequence[AggregateFnBuilder], None]): Aggregation expressions, which can be scalar-generating expressions, including aggregate functions like sum.
- by (Union[Sequence[Value], None]): Grouping expressions, typically set as a collection of dataset column names.
- having (Union[Sequence[BooleanValue], None]): Post-aggregation filter. Its data dimensions must match those of metrics, and its output type must be boolean.
- kwargs: Named aggregation expressions.
Return type: aggregated dataset.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_13", database='my_schema')
>>> ds
| fruit | price |
| ------ | ----- |
| apple | 0.50 |
| apple | 0.50 |
| banana | 0.25 |
| orange | 0.33 |
>>> ds.aggregate(
... by=["fruit"],
... total_cost=ds.price.sum(),
... avg_cost=ds.price.mean(),
... having=ds.price.sum() < 0.5,
... ).order_by("fruit")
| fruit | total_cost | avg_cost |
| ------ | ---------- | -------- |
| string | float64 | float64 |
| ------ | ---------- | -------- |
| banana | 0.25 | 0.25 |
| orange | 0.33 | 0.33 |
count()
Description: Calculates the total number of rows in a dataset.
Input parameters: None.
Return type: int.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_11", database='my_schema')
>>> ds
| a |
| ----- |
| string|
| ----- |
| foo |
| bar |
| baz |
>>> ds.count()
┌─┐
│ 3│
└─┘
execute(*, limit='default', params=None, **kwargs)
Description: Executes expressions.
Input parameters:
- limit(Union[int, str, None]): Limits the total number of rows returned by the expression execution.
- params(Union[Mapping[Value, Any], None]): A mapping of scalar parameter expressions to their values.
- kwargs: Keyword arguments.
Return type: Union[pandas.DataFrame, pandas.Series, Any].
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds.execute()
a b
0 1 c
1 1 a
2 2 a
[3 rows x 2 columns]
filter(expr=None, fn=None, on=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
Description: Filters out rows that do not meet specified conditions.
Input parameters:
- expr(Union[list[ir.BooleanValue], Sequence[ir.BooleanValue], IfAnyAll, None]): A valid Python expression string.
- on(Union[str, list[str]]): Column(s) to apply the function to.
- fn(Union[str, Callable, None]): A callable function or its name.
- num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
- num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
- apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
- concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
- fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.
Return type: filtered dataset after removing non-compliant rows.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_3", database='my_schema')
>>> ds = ds.filter(expr=[ds.a < 2])
>>> ds
| a | b |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1 | c |
| 1 | a |
# Filtered by User-defined functions
# my_schema.id_filter: inputting an int value, returning false if value is larger than 2.
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds = ds.filter(
... fn='my_schema.id_filter',
... on='a',
... )
>>> ds
| a | b |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1 | c |
| 1 | a |
flat_map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
Description: Applies a given function to each row of the dataset and flattens the results.
Input parameters:
- fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
- on (Union[str, list[str], ]): Column(s) to which the function is applied.
- as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
- num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
- num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
- apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
- concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
- fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.
Return type:
Dataset: A dataset containing the new column(s) named as_col.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds.execute()
a b c
0 1 4 1
[1 rows x 3 columns]
>>> ds = ds.flat_map(
... fn='my_schema.py_generate_series',
... on=['a', 'b', 'c'],
... as_col=['outCol'])
>>> ds
| a | b | c | outCol |
| ----- | ----- | ----- | ------ |
| int64 | int64 | int64 | int64 |
| ----- | ----- | ----- | ------ |
| 1 | 4 | 1 | 1 |
| 1 | 4 | 1 | 2 |
| 1 | 4 | 1 | 3 |
| 1 | 4 | 1 | 4 |
# Access table columns using the ds.attribute (dot notation) style
>>> ds = ds.flat_map(
... fn='my_schema.py_generate_series',
... on=[ds.a, ds.b, ds.c],
... as_col='udtf_col')
>>> ds
| a | b | c | udtf_col |
| ----- | ----- | ----- | -------- |
| int64 | int64 | int64 | int64 |
| ----- | ----- | ----- | -------- |
| 1 | 4 | 1 | 1 |
| 1 | 4 | 1 | 2 |
| 1 | 4 | 1 | 3 |
| 1 | 4 | 1 | 4 |
map_batchs(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
Description: Applies a given function to batch data.
Input parameters:
- fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
- on (Union[str, list[str], ]): Column(s) to which the function is applied.
- as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
- num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
- num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
- apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
- concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
- fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.
Return type:
Dataset: A dataset containing the new column(s) named as_col.
groupby(*by, **key_exprs)
Description: Creates a grouped dataset.
Input parameters:
- by (Union[str, Iterable[str], None]): Grouping expression.
- key_exprs (Union[str, Iterable[str]]): Named grouping expressions.
Return type: a grouped dataset.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_14", database='my_schema')
>>> ds
| fruit | price |
| ------ | ----- |
| string | float |
| ------ | ----- |
| apple | 0.50 |
| apple | 0.50 |
| banana | 0.25 |
| orange | 0.33 |
join(ds, /, on=(), *, join_type='inner', left_suffix='left_', right_suffix='right_')
Description: Performs a join operation between two datasets.
Input parameters:
- ds (Dataset): Right-side dataset to join with.
- on (Union[str, Sequence[Union[str, BooleanColumn, Literal[True], Literal[False], tuple[str, str]]]]): Join condition (see the example for details).
- join_type (JoinKind): Type of join, such as inner or left.
- left_suffix (str): Suffix used to rename overlapping columns from the left table (e.g., "left_").
- right_suffix (str): Suffix used to rename overlapping columns from the right table (e.g., "right_").
Return type: joined dataset.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds1 = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds2 = conn.load_dataset("demo_table_16", database='my_schema')
# Equality inner join on the shared id column. Note the _right suffix added to all overlapping columns from the right # table (in this case only the "id" column).
>>> ds1.join(ds2, "id", join_type="inner")
| id | fruit | price | color |
| ------ | ------- | ------- | ------------- |
| int64 | string | float64 | string |
| ------ | ------- | ------- | ------------- |
| 2 | apple | 0.25 | Yellow |
| 3 | banana | 0.33 | Orange |
# Explicit equality join using the default join_type value of "inner". Note join_type there is no _right suffix added # to the movieId column since this is an inner join and the movieId column is part of the join condition.
>>> ds1.join(ds2, ds1.id == ds2.id)
| id | fruit | price | color |
|--------|---------|---------|---------------|
| int64 | string | float64 | string |
|--------|---------|---------|---------------|
| 2 | apple | 0.25 | Yellow |
| 3 | banana | 0.33 | Orange |
map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
Description: Applies a given function to each row of the current dataset.
Input parameters:
- fn (Union[str, Callable]): A callable function or its name, for example, myschema.myfunction.
- on (Union[str, list[str], ]): Column(s) to which the function is applied.
- as_col (Union[str, list[str], None]): Name(s) of the new output column(s). Its length must match the number of fields returned by fn.
- num_dpus(Union[float, None]): Number of data processing units (DPUs), with a fixed CPU-to-memory ratio of 1:4.
- num_apus(Union[float, None]): Number of AI processing units (APUs), applicable for NPU/GPU configurations.
- apu_model(Union[str, None]): Model of the AI processing unit (e.g., NPU/910B, GPU/A100).
- concurrency(Union[int, Tuple[int, int], None]): Concurrency level during function execution.
- fn_constructor_kwargs: Constructor parameters, only applicable if fn is a callable class.
Return type: dataset. Returns the dataset with a new column (as_col) containing the results of the applied function.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table", database='my_schema')
>>> ds = ds.map(
... fn='my_schema.multiply_2',
... on=['col_a'],
... as_col='col_c'
... )
>>> ds
| col_a | col_b | col_c |
|-------|-------|-------|
| int64 | string | int64 |
|-------|-------|-------|
| 1 | c | 2 |
| 1 | a | 2 |
| 2 | a | 4 |
order_by(key, descending=False)
Description: Sorts a dataset based on one or more expressions.
Input parameters:
- key (Union[str, List[str], None]): Expression used for sorting.
- descending (Union[bool, List[bool]]): Whether to sort in descending order. The default value is False.
Return type: sorted dataset.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds
| a | b | c |
|-------|--------|-------|
| 3 | a | 4 |
| 2 | B | 6 |
| 1 | c | 5 |
| 3 | D | 7 |
# Sort by b. Default is ascending. Note how capital letters come before lowercase
>>> ds.order_by("b")
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 2 | B | 6 |
| 3 | D | 7 |
| 3 | a | 4 |
| 1 | c | 5 |
# Sort in descending order
>>> ds.order_by(ds.b.desc())
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1 | c | 5 |
| 3 | a | 4 |
| 3 | D | 7 |
| 2 | B | 6 |
# Sort by multiple columns/expressions
>>> ds.order_by(["a", ds.c.desc()])
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1 | c | 5 |
| 2 | B | 6 |
| 3 | D | 7 |
| 3 | a | 4 |
rename_columns(names, /, **substitutions)
Description: Renames columns in a dataset.
Input parameters:
- names(Mapping[str, str]): A mapping (dictionary) specifying renaming rules as {new-name: old-name}. Columns not included retain their original names.
- substitutions(str): Explicit column renamings specified as keyword arguments in the format new-name=old-name.
Return type: a new dataset with renamed columns.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
>>> ds.rename_columns({"a": "rename_col"})
| rename_col | b |
|------------|--------|
| int64 | string |
|------------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
schema()
Description: Returns the schema of a dataset.
Input parameters: None.
Return type: schema object.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_7", database='my_schema')
>>> ds.schema()
ibis.Schema {
species string
island string
bill_length_mm float64
bill_depth_mm float64
flipper_length_mm int64
}
select_columns(*exprs, **named_exprs)
Description: Selects specified columns from a dataset.
Input parameters:
- exprs(Union[Value, str, Iterable[Union[Value, str]]]): Column expressions, column name strings, or a list of these.
- named_exprs(Union[Value, str]): Column expressions using named parameters.
Return type: a dataset containing the selected columns.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_5", database='my_schema')
>>> ds.select_columns("species", "bill_length_mm")
| species | bill_length_mm |
|------------|----------------|
| string | float64 |
|------------|----------------|
| Adelie | 40.0 |
| Adelie | 41.0 |
| Adelie | 42.0 |
| Adelie | 43.0 |
# Selection by zero-indexed column position
>>> ds.select_columns(ds[0], ds[4])
| species | flipper_length_mm |
|------------|-------------------|
| string | int64 |
|------------|-------------------|
| Adelie | 180 |
| Adelie | 183 |
| Adelie | 190 |
| Adelie | 185 |
show(limit=10)
Description: Displays the data generated by the query results.
Input parameters:
limit(int): Limits the number of rows to display. Defaults to 10.
Return type: None.
take(limit)
Description: Returns up to limit rows from the dataset, ideal for data inspection and previewing.
Input parameters:
- limit (int): Maximum number of rows to return.
Return type: list[dict[str, Any]]: A list containing up to limit rows, with each row represented as a dictionary.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_9", database='my_schema')
>>> ds.take(3)
[{'a': 1}, {'a': 1}, {'a': 2}]
write_iceberg(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
Description: Writes data to a specified Iceberg table.
Input parameters:
- target_name (str): Name of the target table.
- database (str): Database the table belongs to.
- create_if_not_exists (bool): If True, creates the table if it does not already exist.
- overwrite (bool): If True, replaces existing data in the table.
- is_external (bool): If True, treats the table as an external table.
- schema (optional, any type): Optional parameter defining the table's schema.
Return type: None.
write_orc(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
Description: Writes data to a specified ORC table.
Input parameters:
- target_name (str): Name of the target table.
- database (str): Database the table belongs to.
- create_if_not_exists (bool): If True, creates the table if it does not already exist.
- overwrite (bool): If True, replaces existing data in the table.
- is_external (bool): If True, treats the table as an external table.
- schema (optional, any type): Optional parameter defining the table's schema.
Return type: None.
write_parquet(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
Description: Writes data to a specified Parquet table.
Input parameters:
- target_name (str): Name of the target table.
- database (str): Database the table belongs to.
- create_if_not_exists (bool): If True, creates the table if it does not already exist.
- overwrite (bool): If True, replaces existing data in the table.
- is_external (bool): If True, treats the table as an external table.
- schema (optional, any type): Optional parameter defining the table's schema.
Return type: None.
max(on=None)
Description: Calculates the maximum value of specified columns in a dataset.
Input parameters:
on (Union[str, list[str], None]): Column(s) to calculate the maximum for.
Return type: dataset.
min(on=None)
Description: Calculates the minimum value of specified columns in a dataset.
Input parameters:
on (Union[str, list[str], None]): Column(s) to calculate the minimum for.
Return type: dataset.
mean(on=None)
Description: Calculates the mean value of specified columns in a dataset.
Input parameters:
on (Union[str, list[str], None]): Column(s) to calculate the mean for.
Return type: dataset.
unique(col)
Description: Calculates a set of unique values from a specified column in a dataset.
Input parameters:
col (str): Column to extract unique values from.
Return type: list.
limit(n, /, *, offset=0)
Description: Selects rows from the current dataset starting at a given offset.
Input parameters:
- n (Union[int, None]): Number of rows to include. If None, selects all rows from the offset.
- offset(int): Number of initial rows to skip.
Return type:
Dataset: N rows starting from the offset.
Example:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_4", database='my_schema')
>>> ds
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
>>> ds.limit(2)
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
# use None with offset to slice starting from a particular row
>>> ds.limit(None, offset=1)
| a | b |
|--------|--------|
| int64 | string |
|--------|--------|
| 1 | a |
| 2 | a |
take_batch(batch_size=20, *, batch_format='default')
Description: Returns up to batch_size rows from a dataset.
Input parameters:
- batch_size (int): Number of rows in a batch (default is 20).
- batch_format (Optional[str]): Batch format.
- default: Returns a pyarrow.Table.
- pandas: Returns a pandas.DataFrame.
- numpy: Returns a dictionary of NumPy arrays.
Return type: dataset in the specified format.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot