更新时间:2025-12-10 GMT+08:00
分享

多模态数据类型使用示例

图片使用示例

  1. 准备图片类型数据。
    1. 将图片数据读出后写入parquet文件中。
      import pyarrow as pa
      import pandas as pd
      data = {"img": [
          {'filename': "image.png", 'format': 'png', 'height': 1, 'width': 2},
          ]
      }
      with open("image.png", 'rb') as file:
          data["img"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('img', pa.struct([('filename', pa.string()), ('format', pa.string()), ('height', pa.int64()), ('width', pa.int64()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)pyarrow.parquet.write_table(table, "image_type.parquet")
    2. 将写入数据后的parquet文件上传到OBS中。
  2. 创建包含图片类型的表,指定location为上一步的OBS路径。
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import image
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("image_table", schema={"img": image.Image}, external=True, location="obs://image_type")
  3. 使用udf处理图片。
    from fabric_data.multimodal.types import image
    from fabric_data.ibis.expr.operations.udf import RegisterType
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(img: image.Image) -> image.Image:
        new_img = img.resize((10, 10))
        return new_img
    t = con.load_dataset("image_table", database=target_database)
    df = t.select(img=udf(t.img)).execute()
    df["img"][0].show()

音频使用示例

  1. 准备音频类型数据。
    1. 将音频数据读出后写入parquet文件中。
      import pyarrow as pa
      import pandas as pd
      data = {"audio": [
          {'filename': "audio.mp3", 'format': 'mp3', 'data': "", 'subtype': None},
      ]
      }
      import base64
      with open("audio.mp3", 'rb') as file:
          data["audio"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('audio', pa.struct([('filename', pa.string()), ('format', pa.string()), ('subtype', pa.int64()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)
      pyarrow.parquet.write_table(table, "audio_type.parquet")
    2. 将写入数据后的parquet文件上传到OBS中。
  1. 创建包含音频数据的表,指定location为上一步的OBS路径。
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import audio
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("audio_table", schema={"audio": audio.Audio}, external=True, location="obs://audio_type")
  1. 使用udf处理音频。
    from fabric_data.multimodal.types import audio
    from fabric_data.ibis.expr.operations.udf import RegisterType
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(aio: audio.Audio) -> audio.Audio:
        new_aio = aio.truncate(start_time=0, end_time=10)
        return new_aio
    t = con.load_dataset("audio_table", database=target_database)
    df = t.select(aio=udf(t.audio)).execute()

视频使用示例

  1. 准备视频类型数据。
    1. 将视频数据读出后写入parquet文件中。
      import pyarrow as pa
      import pandas as pd
      data = {"video": [
          {'filename': "video.mp4", 'format': 'mp4'},
      ]
      }
      import base64
      with open("video.mp4", 'rb') as file:
          data["video"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('video', pa.struct([('filename', pa.string()), ('format', pa.string()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)
      pyarrow.parquet.write_table(table, "video_type.parquet")
    2. 将写入数据后的parquet文件上传到OBS中。
  1. 创建包含视频数据的表,指定location为上一步的OBS路径。
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import video
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("video_table", schema={"video": video.Video}, external=True, location="obs://video_type")
  1. 使用udf处理视频:
    from fabric_data.multimodal.types import video
    from fabric_data.ibis.expr.operations.udf import RegisterType
    
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(vio: video.Video) -> video.Video:
        new_vio = vio.truncate(0, 5, (500,500))
        return new_vio
    t = con.table("video_table")
    df = t.select(vio=udf(t.video)).execute()

相关文档