更新时间:2025-12-10 GMT+08:00
分享

视频帧描述与摘要生成示例

本示例将介绍如何使用华为Fabric数据湖API实现以下功能:

  • 使用UDTF(用户定义表函数)提取关键视频帧并生成对应的帧描述。
  • 应用UDAF(用户定义聚合函数)将多个帧描述综合成连贯的视频摘要。

环境准备

本示例需要以下Python包:

huawei-fabric-data
huawei-fabricsql-connectorapi
pipeline
torch
transformers

操作步骤

  1. 创建服务器连接。
    import os
    from fabric_data.multimodal import ai_lake
    # Set the target database name
    target_database = "multimodal_lake"
    
    import logging
    con = ai_lake.connect(
        fabric_endpoint=os.getenv("fabric_endpoint"),
        fabric_endpoint_id=os.getenv("fabric_endpoint_id"),
        fabric_workspace_id=os.getenv("fabric_workspace_id"),
        lf_catalog_name=os.getenv("lf_catalog_name"),
        lf_instance_id=os.getenv("lf_instance_id"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"),
        default_database=target_database,
        use_single_cn_mode=True,
        logging_level=logging.WARNING,
    )
    con.set_function_staging_workspace(
        obs_directory_base=os.getenv("obs_directory_base"),
        obs_bucket_name=os.getenv("obs_bucket_name"),
        obs_server=os.getenv("obs_server"),
        access_key=os.getenv("access_key"),
        secret_key=os.getenv("secret_key"))
  2. 创建Fabric表。
    from fabric_data.multimodal.types import video
    # Create a video table where the data is stored in Parquet files on OBS.
    con.create_table("yt_vid_type", database=target_database, table_format="parquet", schema={"vio": video.Video},
                     external=True,
                     location="obs://mini-kernel/dataframe_test/yt_video_type", if_not_exists=True)
    print(con.describe_table("yt_video_type", database=target_database))
  3. 定义视频帧提取处理类。
    import io
    from PIL import Image
    from fabric_data.multimodal.types import video
    class VideoFrameExtractor:
        def __init__(self, target_frames=12):
            self.target_frames = target_frames
        def display_key_frames(self, vio: video.Video):
            from fabric_data.multimodal.types import image
            frames, timestamps, duration = self.extract_key_frames(vio)
            images = []
    
            for index, frame in enumerate(frames):
                pil_img = Image.fromarray(frame).convert("RGB")
                buffer = io.BytesIO()
                pil_img.save(buffer, format="JPEG")
                img = image.Image(filename=timestamps[index], data=buffer.getvalue())
                images.append(img.to_thumbnail((200, 200)))
            image.display_image(images)
        def extract_key_frames(self, vio: video.Video):
            try:
                vr = vio.video_reader
                total_frames = len(vr)
                fps = vr.get_avg_fps()
                duration = total_frames / fps
                # Select the sampling strategy based on video length.
                if duration <= 30:
                    frame_indices = self._uniform_sampling(total_frames, min(8, self.target_frames))
                elif duration <= 180:
                    frame_indices = self._keyframe_sampling(total_frames, self.target_frames)
                else:
                    frame_indices = self._adaptive_sampling(total_frames, fps, self.target_frames)
                frames = vr.get_batch(frame_indices).asnumpy()
                timestamps = [idx / fps for idx in frame_indices]
                return frames, timestamps, duration
            except Exception as e:
                return [], [], 0
    
        def _uniform_sampling(self, total_frames, num_frames):
            """Sampling at uniform intervals."""
            step = max(1, total_frames // num_frames)
            return list(range(0, total_frames, step))[:num_frames]
        def _keyframe_sampling(self, total_frames, num_frames):
            """Key Frame Sampling - Focus on sampling the beginning, middle, and end."""
            key_points = [
                0,
                total_frames // 4,
                total_frames // 2,
                3 * total_frames // 4,
                total_frames - 1
            ]
            # Insert intermediate frames.
            additional = num_frames - len(key_points)
            if additional > 0:
                step = total_frames // (additional + 1)
                additional_indices = [i * step for i in range(1, additional + 1)]
                key_points.extend(additional_indices)
            return sorted(key_points)[:num_frames]
        def _adaptive_sampling(self, total_frames, fps, num_frames):
            """Adaptive Sampling - Sampling by Time Segments"""
            segments = 6
            frames_per_segment = max(1, num_frames // segments)
            indices = []
            for i in range(segments):
                start = (i * total_frames) // segments
                end = ((i + 1) * total_frames) // segments
                # Sample within each paragraph.
                if frames_per_segment > 0:
                    step = max(1, (end - start) // frames_per_segment)
                    segment_indices = [start + j * step for j in range(frames_per_segment)
                                       if start + j * step < end]
                    indices.extend(segment_indices)
            return indices[:num_frames]
    
    t = con.load_dataset("yt_video_type", database=target_database)
    df = t.limit(1).select_columns(t.vio).execute()
    frame_extractor = VideoFrameExtractor()
    for row in df.itertuples():
        print(frame_extractor.display_key_frames(row[1]))
  4. 定义视频帧描述处理类。

    本示例使用Hugging Face的Salesforce/blip-image-captioning-base模型作为图像到文本生成模型。

    由于数据库中的Python UDF目前不提供可靠的网络连接和下载功能,建议将原始模型下载到本地并通过OBS上传压缩文件(如.zip)。

    可以通过以下Python脚本在本地机器上下载Salesforce/blip-image-captioning-base:

    from huggingface_hub import snapshot_download
    local_dir = "blip-image-captioning-base"
    snapshot_download(
        repo_id="Salesforce/blip-image-captioning-base",
        local_dir=local_dir,
        local_dir_use_symlinks=False,
    )
    import json
    import torch
    from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline
    class VideoContentAnalyzer:
        def __init__(self, model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12):
            self.extractor = VideoFrameExtractor(target_frames)
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            self.processor = BlipProcessor.from_pretrained(model_name)
            self.model = BlipForConditionalGeneration.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
            ).to(self.device)
            self.model.eval()
        def __call__(self, vio: video.Video):
            frames, _, _ = self.extractor.extract_key_frames(vio)
            frame_analyses = self.analyze_frames(frames)
            for analyse in frame_analyses:
                yield {"description": json.dumps(analyse)}
        def analyze_frames(self, frames):
            """Analyze the video frame content."""
            frame_analyses = []
            for i, frame in enumerate(frames):
                try:
                    # Generate frame description.
                    frame_description = self._generate_frame_description(frame)
                    # Analyze content features.
                    content_features = self._analyze_content_features(frame_description)
                    frame_analyses.append({
                        'frame_index': i,
                        'description': frame_description,
                        'content_type': content_features['content_type'],
                        'key_objects': content_features['key_objects'],
                        'scene_context': content_features['scene_context']
                    })
                except Exception as e:
                    # Current frame processing failed; skipping to next frames.
                    continue
    
            return frame_analyses
        def _generate_frame_description(self, frame):
            pil_image = Image.fromarray(frame).convert("RGB")
            """Generate a single-frame description."""
            inputs = self.processor(images=pil_image, return_tensors="pt").to(self.device)
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_length=50,
                    num_beams=5,
                    early_stopping=True
                )
            description = self.processor.decode(outputs[0], skip_special_tokens=True)
            return description.strip()
        def _analyze_content_features(self, description):
            description_lower = description.lower()
            content_categories = {
                'educational': ['tutorial', 'lesson', 'explain', 'teach', 'learn', 'education'],
                'entertainment': ['funny', 'comedy', 'entertainment', 'show', 'performance'],
                'nature': ['outdoor', 'nature', 'landscape', 'mountain', 'forest', 'animal'],
                'sports': ['sport', 'game', 'player', 'team', 'match', 'competition'],
                'food': ['food', 'cooking', 'recipe', 'meal', 'restaurant', 'kitchen'],
                'technology': ['computer', 'tech', 'device', 'electronic', 'software'],
                'people': ['person', 'people', 'man', 'woman', 'child', 'group']
            }
            detected_categories = []
            for category, keywords in content_categories.items():
                if any(keyword in description_lower for keyword in keywords):
                    detected_categories.append(category)
            common_objects = [
                'person', 'people', 'man', 'woman', 'child', 'car', 'building',
                'tree', 'house', 'street', 'water', 'sky', 'food', 'animal'
            ]
            key_objects = [obj for obj in common_objects if obj in description_lower]
            if any(word in description_lower for word in ['indoors', 'inside', 'room']):
                scene_context = 'indoor'
            elif any(word in description_lower for word in ['outdoors', 'outside', 'park', 'street']):
                scene_context = 'outdoor'
            else:
                scene_context = 'unknown'
    
            return {
                'content_type': detected_categories[:2] if detected_categories else ['general'],
                'key_objects': key_objects[:5],
                'scene_context': scene_context        }
  5. 创建UDTF处理视频数据。
    import ibis.expr.datatypes as dt
    import fabric_data as fabric
    signature = fabric.Signature(
        parameters=[
            fabric.Parameter(name="vio", annotation=video.Video),
        ],
        return_annotation=dt.Struct({"description": str}),
    )
    try:
        #con.delete_function("VideoContentAnalyzer", database=target_database)
        video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database)
    except Exception as e:
        con.create_table_function(VideoContentAnalyzer, database=target_database, signature=signature, imports=("blip-image-captioning-base",))    video_content_analyzer = con.get_function("VideoContentAnalyzer", database=target_database)
  6. 执行UDTF生成视频帧描述。
    t = t.limit(1)
    descriptions = t.select_columns(video_content_analyzer(t.vio).with_arguments(model_name="./fabric_data/examples/blip-image-captioning-base", target_frames=12).name("descriptions"))
    df = descriptions.execute()
    import pandas as pd
    pd.set_option('display.width', None)
    pd.set_option('max_colwidth', None)print(df)
  7. 定义UDAF从帧描述生成视频摘要。

    以下示例使用Hugging Face的google-t5/t5-small模型作为文本到文本生成模型。

    由于数据库中的Python UDF目前不提供可靠的网络连接和下载功能,建议将原始模型下载到本地并通过OBS上传压缩文件(如.zip)。

    from collections import Counter
    import typing
    class VideoSummaryGenerator:
        """Video Summary Generator"""
    
        def __init__(self, model):
            # Initialize the text summarization model.
            self.summarizer = pipeline(
                "summarization",
                model=model,
                device=0 if torch.cuda.is_available() else -1
            )
        def generate_summary(self, frame_analyses):
            """Generate a video summary"""
            all_descriptions = [analysis['description'] for analysis in frame_analyses]
            if not all_descriptions:
                return "Unable to analyze the video content."
    
            # Analyze the characteristics of the video content.
            content_analysis = self._analyze_video_content(frame_analyses)
            # Generate a summary
            summary = self._generate_detailed_summary(all_descriptions, content_analysis)
            return summary
        def _analyze_video_content(self, frame_analyses):
            """Analyze the overall video content."""
            all_descriptions = ' '.join([analysis['description'] for analysis in frame_analyses]).lower()
            # Extract the key information
            content_types = []
            all_objects = []
            for analysis in frame_analyses:
                content_types.extend(analysis['content_type'])
                all_objects.extend(analysis['key_objects'])
            # Calculate frequency
            common_content_types = [item for item, count in Counter(content_types).most_common(2)]
            common_objects = [item for item, count in Counter(all_objects).most_common(5)]
            return {
                'primary_categories': common_content_types,
                'dominant_objects': common_objects,
                'total_frames_analyzed': len(frame_analyses)
            }
        def _create_description_text(self, descriptions, content_analysis):
            """Create descriptive text."""
            desc_counter = Counter(descriptions)
            representative_descs = [desc for desc, count in desc_counter.most_common(3)]
            text = "Video Content Analysis:\n"
            text += f"Primary Categories: {', '.join(content_analysis['primary_categories'])}\n"
            text += f"Key Objects: {', '.join(content_analysis['dominant_objects'][:3])}\n"
            text += "Representative Scenes:\n"
            for i, desc in enumerate(representative_descs, 1):
                text += f"{i}. {desc}\n"
    
            return text
        def _generate_concise_summary(self, description_text, content_analysis):
            """Generate a concise summary"""
            primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "General content"
            main_objects = ', '.join(content_analysis['dominant_objects'][:2])
            templates = [
                f"This is a {primary_category} video that includes elements like {main_objects}.",
                f"The video presents a {primary_category} scenario, with its focus on {main_objects}.",
                f"The content centers on {primary_category}, highlighting elements such as {main_objects}."
            ]
            import random
            return random.choice(templates)
        def _generate_detailed_summary(self, description_text, content_analysis):
            # Use the summary model to generate a more detailed description.
            try:
                summary = self.summarizer(
                    description_text,
                    max_length=150,
                    min_length=50,
                    do_sample=False
                )[0]['summary_text']
                return summary
            except Exception as e:
                # Failed to generate detailed information, switched to producing concise summary.
                return self._generate_concise_summary(description_text, content_analysis)
        def _generate_engaging_summary(self, description_text, content_analysis):
            """Generate an engaging summar"""
            primary_category = content_analysis['primary_categories'][0] if content_analysis['primary_categories'] else "Awesome"
            main_objects = '、'.join(content_analysis['dominant_objects'][:2])
            engaging_templates = [
                f" Don't miss out! This {primary_category} video takes you deep into the fascinating world of {main_objects}!",
                f" A spectacular showcase! A visual feast exploring the unique charm of {main_objects} in a {primary_category} setting!",
                f" Watch now! This video perfectly captures the {primary_category} moment of {main_objects}!"
            ]
            import random
            return random.choice(engaging_templates)
    class VideoSummarySystem:
        """Video Summary Generation System"""
    
        def __init__(self, model="./fabric_data/examples/t5-small"):
            self.summary_generator = VideoSummaryGenerator(model)
            self.frame_descriptions = []
        def accumulate(self, description: str) -> None:
            self.frame_descriptions.append(json.loads(description))
        def finish(self) -> str:
            summary = self.summary_generator.generate_summary(self.frame_descriptions)
            result = {
                'video_summary': summary,
                'frames_analyzed': len(self.frame_descriptions),
                'content_categories': list(set(
                    cat for analysis in self.frame_descriptions
                    for cat in analysis['content_type']
                )),
                'key_objects': list(set(
                    obj for analysis in self.frame_descriptions
                    for obj in analysis['key_objects']
                ))[:8],
                'processing_details': {
                    'total_frames': len(self.frame_descriptions),
                    'successful_analyses': len(self.frame_descriptions)
                }
            }
            return json.dumps(result)
        @property
        def aggregate_state(self) -> typing.Dict[str, typing.Any]:
            return {
                "frame_descriptions": self.frame_descriptions,
            }
        def merge(self, other_state: typing.Dict[str, typing.Any]) -> None:        self.frame_descriptions += other_state.get("frame_descriptions")
  8. 执行UDAF生成视频描述。
    from fabric_data.multimodal.function import AggregateFnBuilder
    con.delete_function("VideoSummarySystem", database=target_database)
    # Create a UDAF to generate video descriptions from key frame descriptions.
    con.create_agg_function(
        VideoSummarySystem,
        database=target_database,
        imports=("t5-small",))
    video_summary = con.get_function("VideoSummarySystem", database=target_database)
    agg_builder = AggregateFnBuilder(
        fn=video_summary,
        on=[descriptions.descriptions.description],
        as_col="describe",
        num_dpus=0.5,
        constructor_kwargs={"model": "./fabric_data/examples/t5-small"}
    )
    df = descriptions.aggregate(agg_builder).execute()
    for row in df.itertuples():    print(row[1])
  9. 关闭连接。
    con.close()

相关文档