Video Frame Description and Summarization Example
This example illustrates how to use Huawei Cloud DataArts Fabric APIs to:
- Extract key video frames and generate corresponding frame descriptions using a UDTF.
- Combine multiple frame descriptions into a coherent video summary using a UDAF.
Environment Setup
Required Python packages include:
huawei-fabric-data huawei-fabricsql-connectorapi pipeline torch transformers
Procedure
- Create a server connection.
import os from fabric_data.multimodal import ai_lake # Set the target database name target_database = "multimodal_lake" import logging con = ai_lake.connect( fabric_endpoint=os.getenv("fabric_endpoint"), fabric_endpoint_id=os.getenv("fabric_endpoint_id"), fabric_workspace_id=os.getenv("fabric_workspace_id"), lf_catalog_name=os.getenv("lf_catalog_name"), lf_instance_id=os.getenv("lf_instance_id"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING, ) con.set_function_staging_workspace( obs_directory_base=os.getenv("obs_directory_base"), obs_bucket_name=os.getenv("obs_bucket_name"), obs_server=os.getenv("obs_server"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key")) - Create a Fabric table.
from fabric_data.multimodal.types import video # Create a video table where the data is stored in Parquet files on OBS. con.create_table("yt_vid_type", database=target_database, table_format="parquet", schema={"vio": video.Video}, external=True, location="obs://mini-kernel/dataframe_test/yt_video_type", if_not_exists=True) print(con.describe_table("yt_video_type", database=target_database)) - Define classes for video frame extraction.
import io from PIL import Image from fabric_data.multimodal.types import video class VideoFrameExtractor: def __init__(self, target_frames=12): self.target_frames = target_frames def display_key_frames(self, vio: video.Video): from fabric_data.multimodal.types import image frames, timestamps, duration = self.extract_key_frames(vio) images = [] for index, frame in enumerate(frames): pil_img = Image.fromarray(frame).convert("RGB") buffer = io.BytesIO() pil_img.save(buffer, format="JPEG") img = image.Image(filename=timestamps[index], data=buffer.getvalue()) images.append(img.to_thumbnail((200, 200))) image.display_image(images) def extract_key_frames(self, vio: video.Video): try: vr = vio.video_reader total_frames = len(vr) fps = vr.get_avg_fps() duration = total_frames / fps # Select the sampling strategy based on video length. if duration <= 30: frame_indices = self._uniform_sampling(total_frames, min(8, self.target_frames)) elif duration <= 180: frame_indices = self._keyframe_sampling(total_frames, self.target_frames) else: frame_indices = self._adaptive_sampling(total_frames, fps, self.target_frames) frames = vr.get_batch(frame_indices).asnumpy() timestamps = [idx / fps for idx in frame_indices] return frames, timestamps, duration except Exception as e: return [], [], 0 def _uniform_sampling(self, total_frames, num_frames): """Sampling at uniform intervals.""" step = max(1, total_frames // num_frames) return list(range(0, total_frames, step))[:num_frames] def _keyframe_sampling(self, total_frames, num_frames): """Key Frame Sampling - Focus on sampling the beginning, middle, and end.""" key_points = [ 0, total_frames // 4, total_frames // 2, 3 * total_frames // 4, total_frames - 1 ] # Insert intermediate frames. additional = num_frames - len(key_points) if additional > 0: step = total_frames // (additional + 1) additional_indices = [i * step for i in range(1, additional + 1)] key_points.extend(additional_indices) return sorted(key_points)[:num_frames] def _adaptive_sampling(self, total_frames, fps, num_frames): """Adaptive Sampling - Sampling by Time Segments""" segments = 6 frames_per_segment = max(1, num_frames // segments) indices = [] for i in range(segments): start = (i * total_frames) // segments end = ((i + 1) * total_frames) // segments # Sample within each paragraph. if frames_per_segment > 0: step = max(1, (end - start) // frames_per_segment) segment_indices = [start + j * step for j in range(frames_per_segment) if start + j * step < end] indices.extend(segment_indices) return indices[:num_frames] t = con.load_dataset("yt_video_type", database=target_database) df = t.limit(1).select_columns(t.vio).execute() frame_extractor = VideoFrameExtractor() for row in df.itertuples(): print(frame_extractor.display_key_frames(row[1])) - Define classes for video description processing.
This example uses Hugging Face's Salesforce/blip-image-captioning-base model as the image-to-text generation model.
Since Python UDFs in the database currently do not provide reliable network connectivity and download functionality, you are advised to download the original model locally and upload the compressed file (e.g., .zip) via OBS.
Download the Salesforce/blip-image-captioning-base model locally using a Python script.
from huggingface_hub import snapshot_download local_dir = "blip-image-captioning-base" snapshot_download( repo_id="Salesforce/blip-image-captioning-base", local_dir=local_dir, local_dir_use_symlinks=False, )import json import torch from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline class VideoContentAnalyzer: def __init__(self, model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12): self.extractor = VideoFrameExtractor(target_frames) self.device = "cuda" if torch.cuda.is_available() else "cpu" self.processor = BlipProcessor.from_pretrained(model_name) self.model = BlipForConditionalGeneration.from_pretrained( model_name, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 ).to(self.device) self.model.eval() def __call__(self, vio: video.Video): frames, _, _ = self.extractor.extract_key_frames(vio) frame_analyses = self.analyze_frames(frames) for analyse in frame_analyses: yield {"description": json.dumps(analyse)} def analyze_frames(self, frames): """Analyze the video frame content.""" frame_analyses = [] for i, frame in enumerate(frames): try: # Generate frame description. frame_description = self._generate_frame_description(frame) # Analyze content features. content_features = self._analyze_content_features(frame_description) frame_analyses.append({ 'frame_index': i, 'description': frame_description, 'content_type': content_features['content_type'], 'key_objects': content_features['key_objects'], 'scene_context': content_features['scene_context'] }) except Exception as e: # Current frame processing failed; skipping to next frames. continue return frame_analyses def _generate_frame_description(self, frame): pil_image = Image.fromarray(frame).convert("RGB") """Generate a single-frame description.""" inputs = self.processor(images=pil_image, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_length=50, num_beams=5, early_stopping=True ) description = self.processor.decode(outputs[0], skip_special_tokens=True) return description.strip() def _analyze_content_features(self, description): description_lower = description.lower() content_categories = { 'educational': ['tutorial', 'lesson', 'explain', 'teach', 'learn', 'education'], 'entertainment': ['funny', 'comedy', 'entertainment', 'show', 'performance'], 'nature': ['outdoor', 'nature', 'landscape', 'mountain', 'forest', 'animal'], 'sports': ['sport', 'game', 'player', 'team', 'match', 'competition'], 'food': ['food', 'cooking', 'recipe', 'meal', 'restaurant', 'kitchen'], 'technology': ['computer', 'tech', 'device', 'electronic', 'software'], 'people': ['person', 'people', 'man', 'woman', 'child', 'group'] } detected_categories = [] for category, keywords in content_categories.items(): if any(keyword in description_lower for keyword in keywords): detected_categories.append(category) common_objects = [ 'person', 'people', 'man', 'woman', 'child', 'car', 'building', 'tree', 'house', 'street', 'water', 'sky', 'food', 'animal' ] key_objects = [obj for obj in common_objects if obj in description_lower] if any(word in description_lower for word in ['indoors', 'inside', 'room']): scene_context = 'indoor' elif any(word in description_lower for word in ['outdoors', 'outside', 'park', 'street']): scene_context = 'outdoor' else: scene_context = 'unknown' return { 'content_type': detected_categories[:2] if detected_categories else ['general'], 'key_objects': key_objects[:5], 'scene_context': scene_context } - Create a UDTF to process video data.
import ibis.expr.datatypes as dt import fabric_data as fabric signature = fabric.Signature( parameters=[ fabric.Parameter(name="vio", annotation=video.Video), ], return_annotation=dt.Struct({"description": str}), ) try: #con.delete_function("VideoContentAnalyzer", database=target_database) video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database) except Exception as e: con.create_table_function(VideoContentAnalyzer, database=target_database, signature=signature, imports=("blip-image-captioning-base",)) video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database) - Execute the UDTF to generate video frame descriptions.
t = t.limit(1) descriptions = t.select_columns(video_content_analyzer(t.vio).with_arguments(model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12).name("descriptions")) df = descriptions.execute() import pandas as pd pd.set_option('display.width', None) pd.set_option('max_colwidth', None)print(df) - Define a UDAF to create a video summary from frame descriptions.
The following example uses Hugging Face's google-t5/t5-small model as the text-to-text generation model.
Since Python UDFs in the database currently do not provide reliable network connectivity and download functionality, you are advised to download the original model locally and upload the compressed file (e.g., .zip) via OBS.
from collections import Counter import typing class VideoSummaryGenerator: """Video Summary Generator""" def __init__(self, model): # Initialize the text summarization model. self.summarizer = pipeline( "summarization", model=model, device=0 if torch.cuda.is_available() else -1 ) def generate_summary(self, frame_analyses): """Generate a video summary""" all_descriptions = [analysis['description'] for analysis in frame_analyses] if not all_descriptions: return "Unable to analyze the video content." # Analyze the characteristics of the video content. content_analysis = self._analyze_video_content(frame_analyses) # Generate a summary summary = self._generate_detailed_summary(all_descriptions, content_analysis) return summary def _analyze_video_content(self, frame_analyses): """Analyze the overall video content.""" all_descriptions = ' '.join([analysis['description'] for analysis in frame_analyses]).lower() # Extract the key information content_types = [] all_objects = [] for analysis in frame_analyses: content_types.extend(analysis['content_type']) all_objects.extend(analysis['key_objects']) # Calculate frequency common_content_types = [item for item, count in Counter(content_types).most_common(2)] common_objects = [item for item, count in Counter(all_objects).most_common(5)] return { 'primary_categories': common_content_types, 'dominant_objects': common_objects, 'total_frames_analyzed': len(frame_analyses) } def _create_description_text(self, descriptions, content_analysis): """Create descriptive text.""" desc_counter = Counter(descriptions) representative_descs = [desc for desc, count in desc_counter.most_common(3)] text = "Video Content Analysis:\n" text += f"Primary Categories: {', '.join(content_analysis['primary_categories'])}\n" text += f"Key Objects: {', '.join(content_analysis['dominant_objects'][:3])}\n" text += "Representative Scenes:\n" for i, desc in enumerate(representative_descs, 1): text += f"{i}. {desc}\n" return text def _generate_concise_summary(self, description_text, content_analysis): """Generate a concise summary""" primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "General content" main_objects = ', '.join(content_analysis['dominant_objects'][:2]) templates = [ f"This is a {primary_category} video that includes elements like {main_objects}.", f"The video presents a {primary_category} scenario, with its focus on {main_objects}.", f"The content centers on {primary_category}, highlighting elements such as {main_objects}." ] import random return random.choice(templates) def _generate_detailed_summary(self, description_text, content_analysis): # Use the summary model to generate a more detailed description. try: summary = self.summarizer( description_text, max_length=150, min_length=50, do_sample=False )[0]['summary_text'] return summary except Exception as e: # Failed to generate detailed information, switched to producing concise summary. return self._generate_concise_summary(description_text, content_analysis) def _generate_engaging_summary(self, description_text, content_analysis): """Generate an engaging summar""" primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "Awesome" main_objects = ','.join(content_analysis['dominant_objects'][:2]) engaging_templates = [ f" Don't miss out! This {primary_category} video takes you deep into the fascinating world of {main_objects}!", f" A spectacular showcase! A visual feast exploring the unique charm of {main_objects} in a {primary_category} setting!", f" Watch now! This video perfectly captures the {primary_category} moment of {main_objects}!" ] import random return random.choice(engaging_templates) class VideoSummarySystem: """Video Summary Generation System""" def __init__(self, model="./fabric_data/examples/t5-small"): self.summary_generator = VideoSummaryGenerator(model) self.frame_descriptions = [] def accumulate(self, description: str) -> None: self.frame_descriptions.append(json.loads(description)) def finish(self) -> str: summary = self.summary_generator.generate_summary(self.frame_descriptions) result = { 'video_summary': summary, 'frames_analyzed': len(self.frame_descriptions), 'content_categories': list(set( cat for analysis in self.frame_descriptions for cat in analysis['content_type'] )), 'key_objects': list(set( obj for analysis in self.frame_descriptions for obj in analysis['key_objects'] ))[:8], 'processing_details': { 'total_frames': len(self.frame_descriptions), 'successful_analyses': len(self.frame_descriptions) } } return json.dumps(result) @property def aggregate_state(self) -> typing.Dict[str, typing.Any]: return { "frame_descriptions": self.frame_descriptions, } def merge(self, other_state: typing.Dict[str, typing.Any]) -> None: self.frame_descriptions += other_state.get("frame_descriptions") - Execute the UDAF to produce the final video summary.
from fabric_data.multimodal.function import AggregateFnBuilder con.delete_function("VideoSummarySystem", database=target_database) # Create a UDAF to generate video descriptions from key frame descriptions. con.create_agg_function( VideoSummarySystem, database=target_database, imports=("t5-small",)) video_summary = con.get_function("VideoSummarySystem", database=target_database) agg_builder = AggregateFnBuilder( fn=video_summary, on=[descriptions.descriptions.description], as_col="describe", num_dpus=0.5, constructor_kwargs={"model": "./fabric_data/examples/t5-small"} ) df = descriptions.aggregate(agg_builder).execute() for row in df.itertuples(): print(row[1]) - Close the connection.
con.close()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot