Dataset APIs
columns
描述:获取Dataset的所有列的列名集合。
输入参数:无
返回值类型:tuple[str, ...]
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
...
...
... )
>>> ds = conn.load_dataset("demo_table_10", database='my_schema')
>>> ds.columns
('species',
'island',
'bill_length_mm',
'bill_depth_mm', 'flipper_length_mm')
add_column(*exprs, **mutations)
描述:给一个Dataset添加列。
输入参数:
- exprs (Union[Sequence[Expr], None]) :要添加为列的命名表达式序列。
- mutations(Value) :使用关键字参数的命名表达式。
返回值类型:已添加新列的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_6", database='my_schema')
>>> ds
| species | bill_length_mm |
| ------- | -------------- |
| string | float64 |
| ------- | -------------- |
| Adelie | 40.0 |
| Adelie | 41.0 |
| Adelie | 42.0 |
| Adelie | 43.0 |
#Add a new column from a per-element expression
>>> ds.add_column(new_col=ds.bill_length_mm + 1)
| species | bill_length_mm | new_col |
| ------- | -------------- | -------- |
| string | float64 | float64 |
| ------- | -------------- | -------- |
| Adelie | 40.0 | 41.0 |
| Adelie | 41.0 | 42.0 |
| Adelie | 42.0 | 43.0 |
| Adelie | 43.0 | 44.0 |
drop_columns(*fields)
描述:删除Dataset的列。
输入参数:
fields(str) :被删除列的列名。
返回值类型:删除列后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_12", database='my_schema')
>>> ds
| species | island | bill_length_mm | bill_depth_mm | flipper_length_mm |
| ------- | --------- | -------------- | ------------- | ----------------- |
| string | string | float64 | float64 | int64 |
| ------- | --------- | -------------- | ------------- | ----------------- |
| Adelie | Torgersen | 40.0 | 22.0 | 180 |
| Adelie | Torgersen | 41.0 | 19.0 | 183 |
| Adelie | Torgersen | 42.0 | 20.0 | 190 |
| Adelie | Torgersen | 43.0 | 23.0 | 185 |
# Drop one or more columns
>>> ds.drop_columns("species")
| island | bill_length_mm | bill_depth_mm | flipper_length_mm |
| --------- | -------------- | ------------- | ----------------- |
| string | float64 | float64 | int64 |
| --------- | -------------- | ------------- | ----------------- |
| Torgersen | 40.0 | 22.0 | 180 |
| Torgersen | 41.0 | 19.0 | 183 |
| Torgersen | 42.0 | 20.0 | 190 |
| Torgersen | 43.0 | 23.0 | 185 |
aggregate(metrics=(), /, *, by=(), having=(), **kwargs)
描述:通过指定分组字段,用一组聚合函数对数据集进行聚合。
输入参数:
- metrics(Union[Sequence[AggregateFnBuilder], None] ) :聚合表达式,这类表达式可以是任何能生成标量值的表达式,其中包括聚合函数,例如sum。
- by(Union[Sequence[Value], None]) :分组表达式,一般设为Dataset的列名集合。
- having(Union[Sequence[BooleanValue], None]) :聚合后过滤器。其数据维度应该与metrics相同,且having(子句)的输出类型为布尔型。
- kwargs:命名的聚合表达式。
返回值类型:聚合后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_13", database='my_schema')
>>> ds
| fruit | price |
| ------ | ----- |
| apple | 0.50 |
| apple | 0.50 |
| banana | 0.25 |
| orange | 0.33 |
>>> ds.aggregate(
... by=["fruit"],
... total_cost=ds.price.sum(),
... avg_cost=ds.price.mean(),
... having=ds.price.sum() < 0.5,
... ).order_by("fruit")
| fruit | total_cost | avg_cost |
| ------ | ---------- | -------- |
| string | float64 | float64 |
| ------ | ---------- | -------- |
| banana | 0.25 | 0.25 |
| orange | 0.33 | 0.33 |
count()
描述:计算该Dataset的总行数。
输入参数:无
返回值类型:int
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_11", database='my_schema')
>>> ds
| a |
| ----- |
| string|
| ----- |
| foo |
| bar |
| baz |
>>> ds.count()
┌─┐
│ 3│
└─┘
execute(*, limit='default', params=None, **kwargs)
描述:执行表达式。
输入参数:
- limit(Union[int, str, None]) :限定表达式执行结果的总行数。
- params(Union[Mapping[Value, Any], None]) :标量参数表达式到值的映射。
- kwargs:关键字参数
返回值类型:Union[pandas.DataFrame, pandas.Series, Any]
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds.execute()
a b
0 1 c
1 1 a
2 2 a
[3 rows x 2 columns]
filter(expr=None, fn=None, on=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
描述:过滤掉不满足条件的行。
输入参数:
- expr(Union[list[ir.BooleanValue], Sequence[ir.BooleanValue], IfAnyAll, None]):表达式字符串,需为有效的Python表达式。
- on(Union[str, list[str]]) :用于应用函数的列。
- fn(Union[str, Callable, None]) :可调用的函数,或则函数名称。
- num_dpus(Union[float, None]):数据处理单元的数量,CPU与内存之间存在固定比例1:4。
- num_apus(Union[float, None]):AI处理单元的数量,这是针对NPU/GPU的配置选项。
- apu_model(Union[str, None]):AI处理单元型号,例如NPU/910B、GPU/A100等。
- concurrency(Union[int, Tuple[int, int], None]) :函数执行时的并发度。
- fn_constructor_kwargs:构造参数,只有当fn为可调用类时,才能提供此项。
返回值类型:过滤掉不满足条件的行后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_3", database='my_schema')
>>> ds = ds.filter(expr=[ds.a < 2])
>>> ds
| a | b |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1 | c |
| 1 | a |
# Filtered by User-defined functions
# my_schema.id_filter:inputting an int value, returning false if value is larger than 2.
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds = ds.filter(
... fn='my_schema.id_filter',
... on='a',
... )
>>> ds
| a | b |
| ----- | ----- |
| int64 | string|
| ----- | ----- |
| 1 | c |
| 1 | a |
flat_map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
描述:将给定的函数应用于每一行,并对结果进行展平。
输入参数:
- fn (Union[str, Callable]):可调用的函数或其名称,例如myschema.myfunction。
- on (Union[str, list[str], ]) :要应用函数的列或列的集合。
- as_col (Union[str, list[str], None]):输出新列的名称。as_col的长度必须与fn返回的结构体中字段数量一致。
- num_dpus(Union[float, None]):数据处理单元的数量,CPU与内存之间存在固定比例1:4。
- num_apus(Union[float, None]):AI处理单元的数量,这是针对NPU/GPU的配置选项。
- apu_model(Union[str, None]):AI处理单元型号,例如NPU/910B、GPU/A100等。
- concurrency(Union[int, Tuple[int, int], None]) :函数执行时的并发度。
- fn_constructor_kwargs:构造参数,只有当fn为可调用类时,才能提供此项。
返回值类型:
Dataset :调用函数后包含名为as_col的新列的数据集。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_2", database='my_schema')
>>> ds.execute()
a b c
0 1 4 1
[1 rows x 3 columns]
>>> ds = ds.flat_map(
... fn='my_schema.py_generate_series',
... on=['a', 'b', 'c'],
... as_col=['outCol'])
>>> ds
| a | b | c | outCol |
| ----- | ----- | ----- | ------ |
| int64 | int64 | int64 | int64 |
| ----- | ----- | ----- | ------ |
| 1 | 4 | 1 | 1 |
| 1 | 4 | 1 | 2 |
| 1 | 4 | 1 | 3 |
| 1 | 4 | 1 | 4 |
# Access table columns using the ds.attribute (dot notation) style
>>> ds = ds.flat_map(
... fn='my_schema.py_generate_series',
... on=[ds.a, ds.b, ds.c],
... as_col='udtf_col')
>>> ds
| a | b | c | udtf_col |
| ----- | ----- | ----- | -------- |
| int64 | int64 | int64 | int64 |
| ----- | ----- | ----- | -------- |
| 1 | 4 | 1 | 1 |
| 1 | 4 | 1 | 2 |
| 1 | 4 | 1 | 3 |
| 1 | 4 | 1 | 4 |
map_batchs(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
描述:将给定的函数应用于批量数据。
输入参数:
- fn (Union[str, Callable]) :可调用的函数或其名称,例如myschema.myfunction。
- on (Union[str, list[str], ]) :要应用函数的列或列的集合。
- as_col (Union[str, list[str], None]):输出新列的名称。as_col的长度必须与fn返回的结构体中字段数量一致。
- num_dpus(Union[float, None]) :数据处理单元的数量,CPU与内存之间存在固定比例1:4。
- num_apus(Union[float, None]):AI处理单元的数量,这是针对NPU/GPU的配置选项。
- apu_model(Union[str, None]):AI处理单元型号,例如NPU/910B、GPU/A100等。
- concurrency(Union[int, Tuple[int, int], None]):函数执行时的并发度。
- fn_constructor_kwargs:构造参数,只有当fn为可调用类时,才能提供此项。
返回值类型:
Dataset:调用函数后包含名为as_col的新列的数据集。
groupby(*by, **key_exprs)
描述:创建一个分组的Dataset。
输入参数:
- by (Union[str, Iterable[str], None]) :分组表达式。
- key_exprs (Union[str, Iterable[str]]):命名的分组表达式。
返回值类型:一个分组后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_14", database='my_schema')
>>> ds
| fruit | price |
| ------ | ----- |
| string | float |
| ------ | ----- |
| apple | 0.50 |
| apple | 0.50 |
| banana | 0.25 |
| orange | 0.33 |
join(ds, /, on=(), *, join_type='inner', left_suffix='left_', right_suffix='right_')
描述:在两个Dataset之间执行join操作。
输入参数:
- ds (Dataset):右侧要连接的数据集。
- on (Union[str, Sequence[Union[str, BooleanColumn, Literal[True], Literal[False], tuple[str, str]]]]) :连接条件,详见示例。
- join_type (JoinKind) :连接方法,如 "inner" 或 "left"。
- left_suffix (str) :用于重命名左侧表中重叠列的格式字符串后缀(例如 "left_")。
- right_suffix (str) :用于重命名右侧表中重叠列的格式字符串后缀(例如 "right_")。
返回值类型:join后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds1 = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds2 = conn.load_dataset("demo_table_16", database='my_schema')
# Equality inner join on the shared id column. Note the _right suffix added to all overlapping columns from the right # table (in this case only the "id" column).
>>> ds1.join(ds2, "id", join_type="inner")
| id | fruit | price | color |
| ------ | ------- | ------- | ------------- |
| int64 | string | float64 | string |
| ------ | ------- | ------- | ------------- |
| 2 | apple | 0.25 | Yellow |
| 3 | banana | 0.33 | Orange |
# Explicit equality join using the default join_type value of "inner". Note join_type there is no _right suffix added # to the movieId column since this is an inner join and the movieId column is part of the join condition.
>>> ds1.join(ds2, ds1.id == ds2.id)
| id | fruit | price | color |
|--------|---------|---------|---------------|
| int64 | string | float64 | string |
|--------|---------|---------|---------------|
| 2 | apple | 0.25 | Yellow |
| 3 | banana | 0.33 | Orange |
map(fn, on, as_col=None, num_dpus=None, num_apus=None, apu_model=None, concurrency=None, **fn_constructor_kwargs)
描述:对当前Dataset的每一行应用给定函数。
输入参数:
- fn (Union[str, Callable]):可调用函数或其名称,如myschema.myfunction。
- on (Union[str, list[str], ]) :要应用函数的列或列的集合。
- as_col (Union[str, list[str], None]) :输出新列的名称。as_col的长度必须与fn返回的结构体中字段数量一致。
- num_dpus(Union[float, None]):数据处理单元的数量,CPU与内存之间存在固定比例1:4。
- num_apus(Union[float, None]) :AI处理单元的数量,这是针对NPU/GPU的配置选项。
- apu_model(Union[str, None]) :AI处理单元型号,例如NPU/910B、GPU/A100等。
- concurrency(Union[int, Tuple[int, int], None]):函数执行时的并发度。
- fn_constructor_kwargs :构造参数,只有当fn为可调用类时,才能提供此项。
返回值类型: Dataset :调用函数后包含新列(列名为as_col)的数据集。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table", database='my_schema')
>>> ds = ds.map(
... fn='my_schema.multiply_2',
... on=['col_a'],
... as_col='col_c'
... )
>>> ds
| col_a | col_b | col_c |
|-------|-------|-------|
| int64 | string | int64 |
|-------|-------|-------|
| 1 | c | 2 |
| 1 | a | 2 |
| 2 | a | 4 |
order_by(key, descending=False)
描述:根据一个或多个表达式对Dataset进行排序。
输入参数:
- key (Union[str, List[str], None]) :用于排序的表达式。
- descending (Union[bool, List[bool]]) :是否按降序排序(DESC),默认为False。
返回值类型:排序后的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_15", database='my_schema')
>>> ds
| a | b | c |
|-------|--------|-------|
| 3 | a | 4 |
| 2 | B | 6 |
| 1 | c | 5 |
| 3 | D | 7 |
# Sort by b. Default is ascending. Note how capital letters come before lowercase
>>> ds.order_by("b")
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 2 | B | 6 |
| 3 | D | 7 |
| 3 | a | 4 |
| 1 | c | 5 |
# Sort in descending order
>>> ds.order_by(ds.b.desc())
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1 | c | 5 |
| 3 | a | 4 |
| 3 | D | 7 |
| 2 | B | 6 |
# Sort by multiple columns/expressions
>>> ds.order_by(["a", ds.c.desc()])
| a | b | c |
|-------|--------|-------|
| int64 | string | int64 |
|-------|--------|-------|
| 1 | c | 5 |
| 2 | B | 6 |
| 3 | D | 7 |
| 3 | a | 4 |
rename_columns(names, /, **substitutions)
描述:重命名Dataset中的列。
输入参数:
- names(Mapping[str, str]) :映射(字典),以 {新列名: 旧列名} 的形式指定重命名规则,未在映射中出现的列将保留原名。
- substitutions(str) :显式重命名的列,以关键字参数形式指定,格式为新名=旧名。
返回值类型:重命名后的新Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_8", database='my_schema')
>>> ds
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
>>> ds.rename_columns({"a": "rename_col"})
| rename_col | b |
|------------|--------|
| int64 | string |
|------------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
schema()
描述:返回Dataset的schema。
输入参数:无
返回值类型:Schema对象。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_7", database='my_schema')
>>> ds.schema()
ibis.Schema {
species string
island string
bill_length_mm float64
bill_depth_mm float64
flipper_length_mm int64
}
select_columns(*exprs, **named_exprs)
描述:从Dataset的列中选取指定的列。
输入参数:
- exprs(Union[Value, str, Iterable[Union[Value, str]]]) :列表达式、列名字符串,或由列表达式和列名字符串组成的列表。
- named_exprs(Union[Value, str]) :列表达式(支持命名参数形式)。
返回值类型:包含选定列的Dataset。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_5", database='my_schema')
>>> ds.select_columns("species", "bill_length_mm")
| species | bill_length_mm |
|------------|----------------|
| string | float64 |
|------------|----------------|
| Adelie | 40.0 |
| Adelie | 41.0 |
| Adelie | 42.0 |
| Adelie | 43.0 |
# Selection by zero-indexed column position
>>> ds.select_columns(ds[0], ds[4])
| species | flipper_length_mm |
|------------|-------------------|
| string | int64 |
|------------|-------------------|
| Adelie | 180 |
| Adelie | 183 |
| Adelie | 190 |
| Adelie | 185 |
take(limit)
描述:从Dataset中返回最多limit行数据,该方法适用于数据检查和预览。
输入参数:
- limit (int) :要返回的最大行数。
返回值类型: list[dict[str, Any]] :包含数据集中最多limit行的列表,每行以字典形式表示。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_9", database='my_schema')
>>> ds.take(3)
[{'a': 1}, {'a': 1}, {'a': 2}]
write_iceberg(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
描述:将数据写入指定的Iceberg表中。
输入参数:
- target_name (str) :目标表的名称。
- database (str) :表所属的数据库。
- create_if_not_exists (bool) :如果为True,当表不存在时将自动创建该表。
- overwrite (bool):如果为True,将覆盖表中现有的数据。
- is_external (bool) :如果为True,将表视为外部表。
- schema(可选,任意类型):表的结构定义,可选参数。
返回值类型:None
write_orc(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
描述:将数据写入指定的ORC表中。
输入参数:
- target_name (str) :目标表的名称。
- database (str) :表所属的数据库。
- create_if_not_exists (bool) :如果为True,当表不存在时将自动创建该表。
- overwrite (bool) :如果为True,将覆盖表中现有的数据。
- is_external (bool) :如果为True,将表视为外部表。
- schema (可选,任意类型) :表的结构定义,可选参数。
返回值类型:None
write_parquet(target_name, database, create_if_not_exists=False, overwrite=False, is_external=False, schema=None)
描述:将数据写入指定的parquet表中。
输入参数:
- target_name (str) :目标表的名称。
- database (str) :表所属的数据库。
- create_if_not_exists (bool) :如果为True,当表不存在时将自动创建该表。
- overwrite (bool) :如果为True,将覆盖表中现有的数据。
- is_external (bool) :如果为True,将表视为外部表。
- schema(可选,任意类型):表的结构定义,可选参数。
返回值类型:None
max(on=None)
描述:计算Dataset中指定列的最大值。
输入参数:
on (Union[str, list[str], None]) :指定要计算最大值的列。
返回值类型:Dataset
limit(n, /, *, offset=0)
描述:从当前Dataset中选择行,起始位置为偏移量。
输入参数:
- n(Union[int, None]) :要包含的行数。如果为None,则从偏移量开始选择整个数据集。
- offset(int) :起始跳过的行数。
返回值类型:
Dataset :从偏移量开始的n行数据。
示例:
>>> from fabric_data.multimodal import ai_lake
>>> conn = ai_lake.connect(
... endpoint='*',
... ak='*'
... sk='*'
... ...
... )
>>> ds = conn.load_dataset("demo_table_4", database='my_schema')
>>> ds
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
>>> ds.limit(2)
| a | b |
|-------|--------|
| int64 | string |
|-------|--------|
| 1 | c |
| 1 | a |
| 2 | a |
# use None with offset to slice starting from a particular row
>>> ds.limit(None, offset=1)
| a | b |
|--------|--------|
| int64 | string |
|--------|--------|
| 1 | a |
| 2 | a |