更新时间:2025-12-10 GMT+08:00
多模态数据类型使用示例
图片使用示例
- 准备图片类型数据。
- 将图片数据读出后写入parquet文件中。
import pyarrow as pa import pandas as pd data = {"img": [ {'filename': "image.png", 'format': 'png', 'height': 1, 'width': 2}, ] } with open("image.png", 'rb') as file: data["img"][0]["data"] = file.read() df = pd.DataFrame(data) schema = pa.schema([('img', pa.struct([('filename', pa.string()), ('format', pa.string()), ('height', pa.int64()), ('width', pa.int64()), ('data', pa.binary())]))]) table = pa.Table.from_pandas(df, schema=schema)pyarrow.parquet.write_table(table, "image_type.parquet") - 将写入数据后的parquet文件上传到OBS中。
- 将图片数据读出后写入parquet文件中。
- 创建包含图片类型的表,指定location为上一步的OBS路径。
import os from fabric_data.multimodal import ai_lake from fabric_data.multimodal.types import image # Set the target database name target_database = "multimodal_lake" import logging con = ai_lake.connect( fabric_endpoint=os.getenv("fabric_endpoint"), fabric_endpoint_id=os.getenv("fabric_endpoint_id"), fabric_workspace_id=os.getenv("fabric_workspace_id"), lf_catalog_name=os.getenv("lf_catalog_name"), lf_instance_id=os.getenv("lf_instance_id"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING, ) con.set_function_staging_workspace( obs_directory_base=os.getenv("obs_directory_base"), obs_bucket_name=os.getenv("obs_bucket_name"), obs_server=os.getenv("obs_server"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key")) con.create_table("image_table", schema={"img": image.Image}, external=True, location="obs://image_type") - 使用udf处理图片。
from fabric_data.multimodal.types import image from fabric_data.ibis.expr.operations.udf import RegisterType @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database") def udf(img: image.Image) -> image.Image: new_img = img.resize((10, 10)) return new_img t = con.load_dataset("image_table", database=target_database) df = t.select(img=udf(t.img)).execute() df["img"][0].show()
音频使用示例
- 准备音频类型数据。
- 将音频数据读出后写入parquet文件中。
import pyarrow as pa import pandas as pd data = {"audio": [ {'filename': "audio.mp3", 'format': 'mp3', 'data': "", 'subtype': None}, ] } import base64 with open("audio.mp3", 'rb') as file: data["audio"][0]["data"] = file.read() df = pd.DataFrame(data) schema = pa.schema([('audio', pa.struct([('filename', pa.string()), ('format', pa.string()), ('subtype', pa.int64()), ('data', pa.binary())]))]) table = pa.Table.from_pandas(df, schema=schema) pyarrow.parquet.write_table(table, "audio_type.parquet") - 将写入数据后的parquet文件上传到OBS中。
- 将音频数据读出后写入parquet文件中。
- 创建包含音频数据的表,指定location为上一步的OBS路径。
import os from fabric_data.multimodal import ai_lake from fabric_data.multimodal.types import audio # Set the target database name target_database = "multimodal_lake" import logging con = ai_lake.connect( fabric_endpoint=os.getenv("fabric_endpoint"), fabric_endpoint_id=os.getenv("fabric_endpoint_id"), fabric_workspace_id=os.getenv("fabric_workspace_id"), lf_catalog_name=os.getenv("lf_catalog_name"), lf_instance_id=os.getenv("lf_instance_id"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING, ) con.set_function_staging_workspace( obs_directory_base=os.getenv("obs_directory_base"), obs_bucket_name=os.getenv("obs_bucket_name"), obs_server=os.getenv("obs_server"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key")) con.create_table("audio_table", schema={"audio": audio.Audio}, external=True, location="obs://audio_type")
- 使用udf处理音频。
from fabric_data.multimodal.types import audio from fabric_data.ibis.expr.operations.udf import RegisterType @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database") def udf(aio: audio.Audio) -> audio.Audio: new_aio = aio.truncate(start_time=0, end_time=10) return new_aio t = con.load_dataset("audio_table", database=target_database) df = t.select(aio=udf(t.audio)).execute()
视频使用示例
- 准备视频类型数据。
- 将视频数据读出后写入parquet文件中。
import pyarrow as pa import pandas as pd data = {"video": [ {'filename': "video.mp4", 'format': 'mp4'}, ] } import base64 with open("video.mp4", 'rb') as file: data["video"][0]["data"] = file.read() df = pd.DataFrame(data) schema = pa.schema([('video', pa.struct([('filename', pa.string()), ('format', pa.string()), ('data', pa.binary())]))]) table = pa.Table.from_pandas(df, schema=schema) pyarrow.parquet.write_table(table, "video_type.parquet") - 将写入数据后的parquet文件上传到OBS中。
- 将视频数据读出后写入parquet文件中。
- 创建包含视频数据的表,指定location为上一步的OBS路径。
import os from fabric_data.multimodal import ai_lake from fabric_data.multimodal.types import video # Set the target database name target_database = "multimodal_lake" import logging con = ai_lake.connect( fabric_endpoint=os.getenv("fabric_endpoint"), fabric_endpoint_id=os.getenv("fabric_endpoint_id"), fabric_workspace_id=os.getenv("fabric_workspace_id"), lf_catalog_name=os.getenv("lf_catalog_name"), lf_instance_id=os.getenv("lf_instance_id"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING, ) con.set_function_staging_workspace( obs_directory_base=os.getenv("obs_directory_base"), obs_bucket_name=os.getenv("obs_bucket_name"), obs_server=os.getenv("obs_server"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key")) con.create_table("video_table", schema={"video": video.Video}, external=True, location="obs://video_type")
- 使用udf处理视频:
from fabric_data.multimodal.types import video from fabric_data.ibis.expr.operations.udf import RegisterType @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database") def udf(vio: video.Video) -> video.Video: new_vio = vio.truncate(0, 5, (500,500)) return new_vio t = con.table("video_table") df = t.select(vio=udf(t.video)).execute()
父主题: 最佳实践