视频帧描述与摘要生成示例
本示例将介绍如何使用华为Fabric数据湖API实现以下功能:
- 使用UDTF(用户定义表函数)提取关键视频帧并生成对应的帧描述。
- 应用UDAF(用户定义聚合函数)将多个帧描述综合成连贯的视频摘要。
环境准备
本示例需要以下Python包:
huawei-fabric-data huawei-fabricsql-connectorapi pipeline torch transformers
操作步骤
- 创建服务器连接。
import os from fabric_data.multimodal import ai_lake # Set the target database name target_database = "multimodal_lake" import logging con = ai_lake.connect( fabric_endpoint=os.getenv("fabric_endpoint"), fabric_endpoint_id=os.getenv("fabric_endpoint_id"), fabric_workspace_id=os.getenv("fabric_workspace_id"), lf_catalog_name=os.getenv("lf_catalog_name"), lf_instance_id=os.getenv("lf_instance_id"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING, ) con.set_function_staging_workspace( obs_directory_base=os.getenv("obs_directory_base"), obs_bucket_name=os.getenv("obs_bucket_name"), obs_server=os.getenv("obs_server"), access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key")) - 创建Fabric表。
from fabric_data.multimodal.types import video # Create a video table where the data is stored in Parquet files on OBS. con.create_table("yt_vid_type", database=target_database, table_format="parquet", schema={"vio": video.Video}, external=True, location="obs://mini-kernel/dataframe_test/yt_video_type", if_not_exists=True) print(con.describe_table("yt_video_type", database=target_database)) - 定义视频帧提取处理类。
import io from PIL import Image from fabric_data.multimodal.types import video class VideoFrameExtractor: def __init__(self, target_frames=12): self.target_frames = target_frames def display_key_frames(self, vio: video.Video): from fabric_data.multimodal.types import image frames, timestamps, duration = self.extract_key_frames(vio) images = [] for index, frame in enumerate(frames): pil_img = Image.fromarray(frame).convert("RGB") buffer = io.BytesIO() pil_img.save(buffer, format="JPEG") img = image.Image(filename=timestamps[index], data=buffer.getvalue()) images.append(img.to_thumbnail((200, 200))) image.display_image(images) def extract_key_frames(self, vio: video.Video): try: vr = vio.video_reader total_frames = len(vr) fps = vr.get_avg_fps() duration = total_frames / fps # Select the sampling strategy based on video length. if duration <= 30: frame_indices = self._uniform_sampling(total_frames, min(8, self.target_frames)) elif duration <= 180: frame_indices = self._keyframe_sampling(total_frames, self.target_frames) else: frame_indices = self._adaptive_sampling(total_frames, fps, self.target_frames) frames = vr.get_batch(frame_indices).asnumpy() timestamps = [idx / fps for idx in frame_indices] return frames, timestamps, duration except Exception as e: return [], [], 0 def _uniform_sampling(self, total_frames, num_frames): """Sampling at uniform intervals.""" step = max(1, total_frames // num_frames) return list(range(0, total_frames, step))[:num_frames] def _keyframe_sampling(self, total_frames, num_frames): """Key Frame Sampling - Focus on sampling the beginning, middle, and end.""" key_points = [ 0, total_frames // 4, total_frames // 2, 3 * total_frames // 4, total_frames - 1 ] # Insert intermediate frames. additional = num_frames - len(key_points) if additional > 0: step = total_frames // (additional + 1) additional_indices = [i * step for i in range(1, additional + 1)] key_points.extend(additional_indices) return sorted(key_points)[:num_frames] def _adaptive_sampling(self, total_frames, fps, num_frames): """Adaptive Sampling - Sampling by Time Segments""" segments = 6 frames_per_segment = max(1, num_frames // segments) indices = [] for i in range(segments): start = (i * total_frames) // segments end = ((i + 1) * total_frames) // segments # Sample within each paragraph. if frames_per_segment > 0: step = max(1, (end - start) // frames_per_segment) segment_indices = [start + j * step for j in range(frames_per_segment) if start + j * step < end] indices.extend(segment_indices) return indices[:num_frames] t = con.load_dataset("yt_video_type", database=target_database) df = t.limit(1).select_columns(t.vio).execute() frame_extractor = VideoFrameExtractor() for row in df.itertuples(): print(frame_extractor.display_key_frames(row[1])) - 定义视频帧描述处理类。
本示例使用Hugging Face的Salesforce/blip-image-captioning-base模型作为图像到文本生成模型。
由于数据库中的Python UDF目前不提供可靠的网络连接和下载功能,建议将原始模型下载到本地并通过OBS上传压缩文件(如.zip)。
可以通过以下Python脚本在本地机器上下载Salesforce/blip-image-captioning-base:
from huggingface_hub import snapshot_download local_dir = "blip-image-captioning-base" snapshot_download( repo_id="Salesforce/blip-image-captioning-base", local_dir=local_dir, local_dir_use_symlinks=False, )import json import torch from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline class VideoContentAnalyzer: def __init__(self, model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12): self.extractor = VideoFrameExtractor(target_frames) self.device = "cuda" if torch.cuda.is_available() else "cpu" self.processor = BlipProcessor.from_pretrained(model_name) self.model = BlipForConditionalGeneration.from_pretrained( model_name, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 ).to(self.device) self.model.eval() def __call__(self, vio: video.Video): frames, _, _ = self.extractor.extract_key_frames(vio) frame_analyses = self.analyze_frames(frames) for analyse in frame_analyses: yield {"description": json.dumps(analyse)} def analyze_frames(self, frames): """Analyze the video frame content.""" frame_analyses = [] for i, frame in enumerate(frames): try: # Generate frame description. frame_description = self._generate_frame_description(frame) # Analyze content features. content_features = self._analyze_content_features(frame_description) frame_analyses.append({ 'frame_index': i, 'description': frame_description, 'content_type': content_features['content_type'], 'key_objects': content_features['key_objects'], 'scene_context': content_features['scene_context'] }) except Exception as e: # Current frame processing failed; skipping to next frames. continue return frame_analyses def _generate_frame_description(self, frame): pil_image = Image.fromarray(frame).convert("RGB") """Generate a single-frame description.""" inputs = self.processor(images=pil_image, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_length=50, num_beams=5, early_stopping=True ) description = self.processor.decode(outputs[0], skip_special_tokens=True) return description.strip() def _analyze_content_features(self, description): description_lower = description.lower() content_categories = { 'educational': ['tutorial', 'lesson', 'explain', 'teach', 'learn', 'education'], 'entertainment': ['funny', 'comedy', 'entertainment', 'show', 'performance'], 'nature': ['outdoor', 'nature', 'landscape', 'mountain', 'forest', 'animal'], 'sports': ['sport', 'game', 'player', 'team', 'match', 'competition'], 'food': ['food', 'cooking', 'recipe', 'meal', 'restaurant', 'kitchen'], 'technology': ['computer', 'tech', 'device', 'electronic', 'software'], 'people': ['person', 'people', 'man', 'woman', 'child', 'group'] } detected_categories = [] for category, keywords in content_categories.items(): if any(keyword in description_lower for keyword in keywords): detected_categories.append(category) common_objects = [ 'person', 'people', 'man', 'woman', 'child', 'car', 'building', 'tree', 'house', 'street', 'water', 'sky', 'food', 'animal' ] key_objects = [obj for obj in common_objects if obj in description_lower] if any(word in description_lower for word in ['indoors', 'inside', 'room']): scene_context = 'indoor' elif any(word in description_lower for word in ['outdoors', 'outside', 'park', 'street']): scene_context = 'outdoor' else: scene_context = 'unknown' return { 'content_type': detected_categories[:2] if detected_categories else ['general'], 'key_objects': key_objects[:5], 'scene_context': scene_context } - 创建UDTF处理视频数据。
import ibis.expr.datatypes as dt import fabric_data as fabric signature = fabric.Signature( parameters=[ fabric.Parameter(name="vio", annotation=video.Video), ], return_annotation=dt.Struct({"description": str}), ) try: #con.delete_function("VideoContentAnalyzer", database=target_database) video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database) except Exception as e: con.create_table_function(VideoContentAnalyzer, database=target_database, signature=signature, imports=("blip-image-captioning-base",)) video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database) - 执行UDTF生成视频帧描述。
t = t.limit(1) descriptions = t.select_columns(video_content_analyzer(t.vio).with_arguments(model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12).name("descriptions")) df = descriptions.execute() import pandas as pd pd.set_option('display.width', None) pd.set_option('max_colwidth', None)print(df) - 定义UDAF从帧描述生成视频摘要。
以下示例使用Hugging Face的google-t5/t5-small模型作为文本到文本生成模型。
由于数据库中的Python UDF目前不提供可靠的网络连接和下载功能,建议将原始模型下载到本地并通过OBS上传压缩文件(如.zip)。
from collections import Counter import typing class VideoSummaryGenerator: """Video Summary Generator""" def __init__(self, model): # Initialize the text summarization model. self.summarizer = pipeline( "summarization", model=model, device=0 if torch.cuda.is_available() else -1 ) def generate_summary(self, frame_analyses): """Generate a video summary""" all_descriptions = [analysis['description'] for analysis in frame_analyses] if not all_descriptions: return "Unable to analyze the video content." # Analyze the characteristics of the video content. content_analysis = self._analyze_video_content(frame_analyses) # Generate a summary summary = self._generate_detailed_summary(all_descriptions, content_analysis) return summary def _analyze_video_content(self, frame_analyses): """Analyze the overall video content.""" all_descriptions = ' '.join([analysis['description'] for analysis in frame_analyses]).lower() # Extract the key information content_types = [] all_objects = [] for analysis in frame_analyses: content_types.extend(analysis['content_type']) all_objects.extend(analysis['key_objects']) # Calculate frequency common_content_types = [item for item, count in Counter(content_types).most_common(2)] common_objects = [item for item, count in Counter(all_objects).most_common(5)] return { 'primary_categories': common_content_types, 'dominant_objects': common_objects, 'total_frames_analyzed': len(frame_analyses) } def _create_description_text(self, descriptions, content_analysis): """Create descriptive text.""" desc_counter = Counter(descriptions) representative_descs = [desc for desc, count in desc_counter.most_common(3)] text = "Video Content Analysis:\n" text += f"Primary Categories: {', '.join(content_analysis['primary_categories'])}\n" text += f"Key Objects: {', '.join(content_analysis['dominant_objects'][:3])}\n" text += "Representative Scenes:\n" for i, desc in enumerate(representative_descs, 1): text += f"{i}. {desc}\n" return text def _generate_concise_summary(self, description_text, content_analysis): """Generate a concise summary""" primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "General content" main_objects = ', '.join(content_analysis['dominant_objects'][:2]) templates = [ f"This is a {primary_category} video that includes elements like {main_objects}.", f"The video presents a {primary_category} scenario, with its focus on {main_objects}.", f"The content centers on {primary_category}, highlighting elements such as {main_objects}." ] import random return random.choice(templates) def _generate_detailed_summary(self, description_text, content_analysis): # Use the summary model to generate a more detailed description. try: summary = self.summarizer( description_text, max_length=150, min_length=50, do_sample=False )[0]['summary_text'] return summary except Exception as e: # Failed to generate detailed information, switched to producing concise summary. return self._generate_concise_summary(description_text, content_analysis) def _generate_engaging_summary(self, description_text, content_analysis): """Generate an engaging summar""" primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "Awesome" main_objects = '、'.join(content_analysis['dominant_objects'][:2]) engaging_templates = [ f" Don't miss out! This {primary_category} video takes you deep into the fascinating world of {main_objects}!", f" A spectacular showcase! A visual feast exploring the unique charm of {main_objects} in a {primary_category} setting!", f" Watch now! This video perfectly captures the {primary_category} moment of {main_objects}!" ] import random return random.choice(engaging_templates) class VideoSummarySystem: """Video Summary Generation System""" def __init__(self, model="./fabric_data/examples/t5-small"): self.summary_generator = VideoSummaryGenerator(model) self.frame_descriptions = [] def accumulate(self, description: str) -> None: self.frame_descriptions.append(json.loads(description)) def finish(self) -> str: summary = self.summary_generator.generate_summary(self.frame_descriptions) result = { 'video_summary': summary, 'frames_analyzed': len(self.frame_descriptions), 'content_categories': list(set( cat for analysis in self.frame_descriptions for cat in analysis['content_type'] )), 'key_objects': list(set( obj for analysis in self.frame_descriptions for obj in analysis['key_objects'] ))[:8], 'processing_details': { 'total_frames': len(self.frame_descriptions), 'successful_analyses': len(self.frame_descriptions) } } return json.dumps(result) @property def aggregate_state(self) -> typing.Dict[str, typing.Any]: return { "frame_descriptions": self.frame_descriptions, } def merge(self, other_state: typing.Dict[str, typing.Any]) -> None: self.frame_descriptions += other_state.get("frame_descriptions") - 执行UDAF生成视频描述。
from fabric_data.multimodal.function import AggregateFnBuilder con.delete_function("VideoSummarySystem", database=target_database) # Create a UDAF to generate video descriptions from key frame descriptions. con.create_agg_function( VideoSummarySystem, database=target_database, imports=("t5-small",)) video_summary = con.get_function("VideoSummarySystem", database=target_database) agg_builder = AggregateFnBuilder( fn=video_summary, on=[descriptions.descriptions.description], as_col="describe", num_dpus=0.5, constructor_kwargs={"model": "./fabric_data/examples/t5-small"} ) df = descriptions.aggregate(agg_builder).execute() for row in df.itertuples(): print(row[1]) - 关闭连接。
con.close()