Updated on 2025-12-19 GMT+08:00

Examples of Multimodal Data Type Usage

Image Example

  1. Prepare image data.
    1. Read the image data and write it into a Parquet file.
      import pyarrow as pa
      import pandas as pd
      data = {"img": [
          {'filename': "image.png", 'format': 'png', 'height': 1, 'width': 2},
          ]
      }
      with open("image.png", 'rb') as file:
          data["img"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('img', pa.struct([('filename', pa.string()), ('format', pa.string()), ('height', pa.int64()), ('width', pa.int64()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)pyarrow.parquet.write_table(table, "image_type.parquet")
    2. Upload the generated Parquet file to OBS.
  2. Create a table containing the image type, specifying the location as the OBS path from the previous step.
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import image
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("image_table", schema={"img": image.Image}, external=True, location="obs://image_type")
  3. Use UDFs to process the images.
    from fabric_data.multimodal.types import image
    from fabric_data.ibis.expr.operations.udf import RegisterType
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(img: image.Image) -> image.Image:
        new_img = img.resize((10, 10))
        return new_img
    t = con.load_dataset("image_table", database=target_database)
    df = t.select(img=udf(t.img)).execute()
    df["img"][0].show()

Audio Example

  1. Prepare audio data.
    1. Read the audio data and write it into a Parquet file.
      import pyarrow as pa
      import pandas as pd
      data = {"audio": [
          {'filename': "audio.mp3", 'format': 'mp3', 'data': "", 'subtype': None},
      ]
      }
      import base64
      with open("audio.mp3", 'rb') as file:
          data["audio"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('audio', pa.struct([('filename', pa.string()), ('format', pa.string()), ('subtype', pa.int64()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)
      pyarrow.parquet.write_table(table, "audio_type.parquet")
    2. Upload the generated Parquet file to OBS.
  1. Create a table containing the audio type, specifying the location as the OBS path from the previous step.
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import audio
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("audio_table", schema={"audio": audio.Audio}, external=True, location="obs://audio_type")
  1. Use UDFs to process the audio.
    from fabric_data.multimodal.types import audio
    from fabric_data.ibis.expr.operations.udf import RegisterType
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(aio: audio.Audio) -> audio.Audio:
        new_aio = aio.truncate(start_time=0, end_time=10)
        return new_aio
    t = con.load_dataset("audio_table", database=target_database)
    df = t.select(aio=udf(t.audio)).execute()

Video Example

  1. Prepare video data.
    1. Read the video data and write it into a Parquet file.
      import pyarrow as pa
      import pandas as pd
      data = {"video": [
          {'filename': "video.mp4", 'format': 'mp4'},
      ]
      }
      import base64
      with open("video.mp4", 'rb') as file:
          data["video"][0]["data"] = file.read()
      df = pd.DataFrame(data)
      schema = pa.schema([('video', pa.struct([('filename', pa.string()), ('format', pa.string()), ('data', pa.binary())]))])
      table = pa.Table.from_pandas(df, schema=schema)
      pyarrow.parquet.write_table(table, "video_type.parquet")
    2. Upload the generated Parquet file to OBS.
  1. Create a table containing the video type, specifying the location as the OBS path from the previous step.
    import os
    from fabric_data.multimodal import ai_lake
    from fabric_data.multimodal.types import video
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
    con.create_table("video_table", schema={"video": video.Video}, external=True, location="obs://video_type")
  1. Use UDFs to process the video.
    from fabric_data.multimodal.types import video
    from fabric_data.ibis.expr.operations.udf import RegisterType
    
    @fabric_data.udf.python(register_type=RegisterType.STAGED, database="database")
    def udf(vio: video.Video) -> video.Video:
        new_vio = vio.truncate(0, 5, (500,500))
        return new_vio
    t = con.table("video_table")
    df = t.select(vio=udf(t.video)).execute()