Updated on 2025-12-19 GMT+08:00

Video Frame Description and Summarization Example

This example illustrates how to use Huawei Cloud DataArts Fabric APIs to:

  • Extract key video frames and generate corresponding frame descriptions using a UDTF.
  • Combine multiple frame descriptions into a coherent video summary using a UDAF.

Environment Setup

Required Python packages include:

huawei-fabric-data
huawei-fabricsql-connectorapi
pipeline
torch
transformers

Procedure

  1. Create a server connection.
    import os
    from fabric_data.multimodal import ai_lake
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
  2. Create a Fabric table.
    from fabric_data.multimodal.types import video
    # Create a video table where the data is stored in Parquet files on OBS.
    con.create_table("yt_vid_type", database=target_database, table_format="parquet", schema={"vio": video.Video},
                     external=True,
                     location="obs://mini-kernel/dataframe_test/yt_video_type", if_not_exists=True)
    print(con.describe_table("yt_video_type", database=target_database))
  3. Define classes for video frame extraction.
    import io
    from PIL import Image
    from fabric_data.multimodal.types import video
    class VideoFrameExtractor:
        def __init__(self, target_frames=12):
            self.target_frames = target_frames
        def display_key_frames(self, vio: video.Video):
            from fabric_data.multimodal.types import image
            frames, timestamps, duration = self.extract_key_frames(vio)
            images = []
    
            for index, frame in enumerate(frames):
                pil_img = Image.fromarray(frame).convert("RGB")
                buffer = io.BytesIO()
                pil_img.save(buffer, format="JPEG")
                img = image.Image(filename=timestamps[index], data=buffer.getvalue())
                images.append(img.to_thumbnail((200, 200)))
            image.display_image(images)
        def extract_key_frames(self, vio: video.Video):
            try:
                vr = vio.video_reader
                total_frames = len(vr)
                fps = vr.get_avg_fps()
                duration = total_frames / fps
                # Select the sampling strategy based on video length.
                if duration <= 30:
                    frame_indices = self._uniform_sampling(total_frames, min(8, self.target_frames))
                elif duration <= 180:
                    frame_indices = self._keyframe_sampling(total_frames, self.target_frames)
                else:
                    frame_indices = self._adaptive_sampling(total_frames, fps, self.target_frames)
                frames = vr.get_batch(frame_indices).asnumpy()
                timestamps = [idx / fps for idx in frame_indices]
                return frames, timestamps, duration
            except Exception as e:
                return [], [], 0
    
        def _uniform_sampling(self, total_frames, num_frames):
            """Sampling at uniform intervals."""
            step = max(1, total_frames // num_frames)
            return list(range(0, total_frames, step))[:num_frames]
        def _keyframe_sampling(self, total_frames, num_frames):
            """Key Frame Sampling - Focus on sampling the beginning, middle, and end."""
            key_points = [
                0,
                total_frames // 4,
                total_frames // 2,
                3 * total_frames // 4,
                total_frames - 1
            ]
            # Insert intermediate frames.
            additional = num_frames - len(key_points)
            if additional > 0:
                step = total_frames // (additional + 1)
                additional_indices = [i * step for i in range(1, additional + 1)]
                key_points.extend(additional_indices)
            return sorted(key_points)[:num_frames]
        def _adaptive_sampling(self, total_frames, fps, num_frames):
            """Adaptive Sampling - Sampling by Time Segments"""
            segments = 6
            frames_per_segment = max(1, num_frames // segments)
            indices = []
            for i in range(segments):
                start = (i * total_frames) // segments
                end = ((i + 1) * total_frames) // segments
                # Sample within each paragraph.
                if frames_per_segment > 0:
                    step = max(1, (end - start) // frames_per_segment)
                    segment_indices = [start + j * step for j in range(frames_per_segment)
                                       if start + j * step < end]
                    indices.extend(segment_indices)
            return indices[:num_frames]
    
    t = con.load_dataset("yt_video_type", database=target_database)
    df = t.limit(1).select_columns(t.vio).execute()
    frame_extractor = VideoFrameExtractor()
    for row in df.itertuples():
        print(frame_extractor.display_key_frames(row[1]))
  4. Define classes for video description processing.

    This example uses Hugging Face's Salesforce/blip-image-captioning-base model as the image-to-text generation model.

    Since Python UDFs in the database currently do not provide reliable network connectivity and download functionality, you are advised to download the original model locally and upload the compressed file (e.g., .zip) via OBS.

    Download the Salesforce/blip-image-captioning-base model locally using a Python script.

    from huggingface_hub import snapshot_download
    local_dir = "blip-image-captioning-base"
    snapshot_download(
        repo_id="Salesforce/blip-image-captioning-base",
        local_dir=local_dir,
        local_dir_use_symlinks=False,
    )
    import json
    import torch
    from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline
    class VideoContentAnalyzer:
        def __init__(self, model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12):
            self.extractor = VideoFrameExtractor(target_frames)
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            self.processor = BlipProcessor.from_pretrained(model_name)
            self.model = BlipForConditionalGeneration.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
            ).to(self.device)
            self.model.eval()
        def __call__(self, vio: video.Video):
            frames, _, _ = self.extractor.extract_key_frames(vio)
            frame_analyses = self.analyze_frames(frames)
            for analyse in frame_analyses:
                yield {"description": json.dumps(analyse)}
        def analyze_frames(self, frames):
            """Analyze the video frame content."""
            frame_analyses = []
            for i, frame in enumerate(frames):
                try:
                    # Generate frame description.
                    frame_description = self._generate_frame_description(frame)
                    # Analyze content features.
                    content_features = self._analyze_content_features(frame_description)
                    frame_analyses.append({
                        'frame_index': i,
                        'description': frame_description,
                        'content_type': content_features['content_type'],
                        'key_objects': content_features['key_objects'],
                        'scene_context': content_features['scene_context']
                    })
                except Exception as e:
                    # Current frame processing failed; skipping to next frames.
                    continue
    
            return frame_analyses
        def _generate_frame_description(self, frame):
            pil_image = Image.fromarray(frame).convert("RGB")
            """Generate a single-frame description."""
            inputs = self.processor(images=pil_image, return_tensors="pt").to(self.device)
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_length=50,
                    num_beams=5,
                    early_stopping=True
                )
            description = self.processor.decode(outputs[0], skip_special_tokens=True)
            return description.strip()
        def _analyze_content_features(self, description):
            description_lower = description.lower()
            content_categories = {
                'educational': ['tutorial', 'lesson', 'explain', 'teach', 'learn', 'education'],
                'entertainment': ['funny', 'comedy', 'entertainment', 'show', 'performance'],
                'nature': ['outdoor', 'nature', 'landscape', 'mountain', 'forest', 'animal'],
                'sports': ['sport', 'game', 'player', 'team', 'match', 'competition'],
                'food': ['food', 'cooking', 'recipe', 'meal', 'restaurant', 'kitchen'],
                'technology': ['computer', 'tech', 'device', 'electronic', 'software'],
                'people': ['person', 'people', 'man', 'woman', 'child', 'group']
            }
            detected_categories = []
            for category, keywords in content_categories.items():
                if any(keyword in description_lower for keyword in keywords):
                    detected_categories.append(category)
            common_objects = [
                'person', 'people', 'man', 'woman', 'child', 'car', 'building',
                'tree', 'house', 'street', 'water', 'sky', 'food', 'animal'
            ]
            key_objects = [obj for obj in common_objects if obj in description_lower]
            if any(word in description_lower for word in ['indoors', 'inside', 'room']):
                scene_context = 'indoor'
            elif any(word in description_lower for word in ['outdoors', 'outside', 'park', 'street']):
                scene_context = 'outdoor'
            else:
                scene_context = 'unknown'
    
            return {
                'content_type': detected_categories[:2] if detected_categories else ['general'],
                'key_objects': key_objects[:5],
                'scene_context': scene_context        }
  5. Create a UDTF to process video data.
    import ibis.expr.datatypes as dt
    import fabric_data as fabric
    signature = fabric.Signature(
        parameters=[
            fabric.Parameter(name="vio", annotation=video.Video),
        ],
        return_annotation=dt.Struct({"description": str}),
    )
    try:
        #con.delete_function("VideoContentAnalyzer", database=target_database)
        video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database)
    except Exception as e:
        con.create_table_function(VideoContentAnalyzer, database=target_database, signature=signature, imports=("blip-image-captioning-base",))    video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database)
  6. Execute the UDTF to generate video frame descriptions.
    t = t.limit(1)
    descriptions = t.select_columns(video_content_analyzer(t.vio).with_arguments(model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12).name("descriptions"))
    df = descriptions.execute()
    import pandas as pd
    pd.set_option('display.width', None)
    pd.set_option('max_colwidth', None)print(df)
  7. Define a UDAF to create a video summary from frame descriptions.

    The following example uses Hugging Face's google-t5/t5-small model as the text-to-text generation model.

    Since Python UDFs in the database currently do not provide reliable network connectivity and download functionality, you are advised to download the original model locally and upload the compressed file (e.g., .zip) via OBS.

    from collections import Counter
    import typing
    class VideoSummaryGenerator:
        """Video Summary Generator"""
    
        def __init__(self, model):
            # Initialize the text summarization model.
            self.summarizer = pipeline(
                "summarization",
                model=model,
                device=0 if torch.cuda.is_available() else -1
            )
        def generate_summary(self, frame_analyses):
            """Generate a video summary"""
            all_descriptions = [analysis['description'] for analysis in frame_analyses]
            if not all_descriptions:
                return "Unable to analyze the video content."
    
            # Analyze the characteristics of the video content.
            content_analysis = self._analyze_video_content(frame_analyses)
            # Generate a summary
            summary = self._generate_detailed_summary(all_descriptions, content_analysis)
            return summary
        def _analyze_video_content(self, frame_analyses):
            """Analyze the overall video content."""
            all_descriptions = ' '.join([analysis['description'] for analysis in frame_analyses]).lower()
            # Extract the key information
            content_types = []
            all_objects = []
            for analysis in frame_analyses:
                content_types.extend(analysis['content_type'])
                all_objects.extend(analysis['key_objects'])
            # Calculate frequency
            common_content_types = [item for item, count in Counter(content_types).most_common(2)]
            common_objects = [item for item, count in Counter(all_objects).most_common(5)]
            return {
                'primary_categories': common_content_types,
                'dominant_objects': common_objects,
                'total_frames_analyzed': len(frame_analyses)
            }
        def _create_description_text(self, descriptions, content_analysis):
            """Create descriptive text."""
            desc_counter = Counter(descriptions)
            representative_descs = [desc for desc, count in desc_counter.most_common(3)]
            text = "Video Content Analysis:\n"
            text += f"Primary Categories: {', '.join(content_analysis['primary_categories'])}\n"
            text += f"Key Objects: {', '.join(content_analysis['dominant_objects'][:3])}\n"
            text += "Representative Scenes:\n"
            for i, desc in enumerate(representative_descs, 1):
                text += f"{i}. {desc}\n"
    
            return text
        def _generate_concise_summary(self, description_text, content_analysis):
            """Generate a concise summary"""
            primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "General content"
            main_objects = ', '.join(content_analysis['dominant_objects'][:2])
            templates = [
                f"This is a {primary_category} video that includes elements like {main_objects}.",
                f"The video presents a {primary_category} scenario, with its focus on {main_objects}.",
                f"The content centers on {primary_category}, highlighting elements such as {main_objects}."
            ]
            import random
            return random.choice(templates)
        def _generate_detailed_summary(self, description_text, content_analysis):
            # Use the summary model to generate a more detailed description.
            try:
                summary = self.summarizer(
                    description_text,
                    max_length=150,
                    min_length=50,
                    do_sample=False
                )[0]['summary_text']
                return summary
            except Exception as e:
                # Failed to generate detailed information, switched to producing concise summary.
                return self._generate_concise_summary(description_text, content_analysis)
        def _generate_engaging_summary(self, description_text, content_analysis):
            """Generate an engaging summar"""
            primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "Awesome"
            main_objects = ','.join(content_analysis['dominant_objects'][:2])
            engaging_templates = [
                f" Don't miss out! This {primary_category} video takes you deep into the fascinating world of {main_objects}!",
                f" A spectacular showcase! A visual feast exploring the unique charm of {main_objects} in a {primary_category} setting!",
                f" Watch now! This video perfectly captures the {primary_category} moment of {main_objects}!"
            ]
            import random
            return random.choice(engaging_templates)
    class VideoSummarySystem:
        """Video Summary Generation System"""
    
        def __init__(self, model="./fabric_data/examples/t5-small"):
            self.summary_generator = VideoSummaryGenerator(model)
            self.frame_descriptions = []
        def accumulate(self, description: str) -> None:
            self.frame_descriptions.append(json.loads(description))
        def finish(self) -> str:
            summary = self.summary_generator.generate_summary(self.frame_descriptions)
            result = {
                'video_summary': summary,
                'frames_analyzed': len(self.frame_descriptions),
                'content_categories': list(set(
                    cat for analysis in self.frame_descriptions
                    for cat in analysis['content_type']
                )),
                'key_objects': list(set(
                    obj for analysis in self.frame_descriptions
                    for obj in analysis['key_objects']
                ))[:8],
                'processing_details': {
                    'total_frames': len(self.frame_descriptions),
                    'successful_analyses': len(self.frame_descriptions)
                }
            }
            return json.dumps(result)
        @property
        def aggregate_state(self) -> typing.Dict[str, typing.Any]:
            return {
                "frame_descriptions": self.frame_descriptions,
            }
        def merge(self, other_state: typing.Dict[str, typing.Any]) -> None:        self.frame_descriptions += other_state.get("frame_descriptions")
  8. Execute the UDAF to produce the final video summary.
    from fabric_data.multimodal.function import AggregateFnBuilder
    con.delete_function("VideoSummarySystem", database=target_database)
    # Create a UDAF to generate video descriptions from key frame descriptions.
    con.create_agg_function(
        VideoSummarySystem,
        database=target_database,
        imports=("t5-small",))
    video_summary = con.get_function("VideoSummarySystem", database=target_database)
    agg_builder = AggregateFnBuilder(
        fn=video_summary,
        on=[descriptions.descriptions.description],
        as_col="describe",
        num_dpus=0.5,
        constructor_kwargs={"model": "./fabric_data/examples/t5-small"}
    )
    df = descriptions.aggregate(agg_builder).execute()
    for row in df.itertuples():    print(row[1])
  9. Close the connection.
    con.close()