更新时间:2025-12-10 GMT+08:00
分享

语音识别案例

本案例介绍如何定义Vectorized Scalar UDF来进行语音识别、如何定义UDAF来进行聚合统计和可视化。

  1. 从fabric_data中引入高阶类型(图片、音频、视频):

    from fabric_data.multimodal.types.image import Image, display_image
    from fabric_data.multimodal.types.audio import Audio
    from fabric_data.multimodal.types.vectorized import PandasVector

  2. 定义Class Vectorized Scalar UDF来进行语音识别,这里使用到外部包transformers、torch:

    class SpeechRecognitionUDF:
        def __init__(self, model_path: str) -> None:
            import os
            from transformers import pipeline
            import torch
    
            model_dir = os.path.abspath(model_path)
            self._asr = pipeline(
                "automatic-speech-recognition",
                model=model_dir,
                device=0 if torch.cuda.is_available() else -1,
            )
    
        def __call__(self, audios: PandasVector[Audio]) -> PandasVector[str]:
            return PandasVector[str](audios.map(
                lambda audio: self._asr(audio.sound_file.read(dtype="float32"))["text"]
            ))

  3. 定义UDAF来进行TF-IDF词频统计和词云图片生成,这里使用到外部包wordcloud、matplotlib:

    class SpeakerWordsUDAF:
        def __init__(self, top_n: int = 40):
            from typing import List
            self._texts: List[str] = []
            self._top_n = top_n
            self._chapter_id = None
    
            # tiny built-in stopword list (extend as needed)
            self._stopwords = {
                "the", "a", "an", "and", "or", "but",
                "also", "about", "this", "that", "these", "those",
                "his", "her", "its", "their", "our", "your",
                "of", "to", "in", "on", "for", "at", "by", "with",
                "from", "as", "is", "are", "was", "were", "be", "been",
                "it", "he", "she", "they", "we", "you", "i",
                "not", "do", "did", "does", "had", "have", "has",
                "how", "what", "when", "where", "which", "who", "whom",
                "would", "could", "should", "will", "can", "may",
            }
    
        @property
        def aggregate_state(self):
            return {
                "texts": self._texts,
                "chapter_id": self._chapter_id,
            }
    
        def accumulate(self, chapter_id: int, hyp_text: str):
            if hyp_text:
                self._texts.append(hyp_text)
            if chapter_id:
                self._chapter_id = chapter_id
    
        def merge(self, other_state):
            if other_texts := other_state.get("texts"):
                self._texts.extend(other_texts)
            if other_chapterid := other_state.get("chapter_id"):
                if self._chapter_id:
                    # for one SpeakerWordsUDAF instance, it should process texts with same chapter_id
                    assert self._chapter_id == other_chapterid
                else:
                    # CN get chapter_id from DN
                    self._chapter_id = other_chapterid
    
        def _tokenize(self, text: str):
            # simple whitespace tokenizer + basic cleaning
            tokens = []
            for t in text.split():
                t = t.strip().lower()
                # drop short tokens and non-alpha tokens
                if len(t) < 3:
                    continue
                if not t.isalpha():
                    continue
                if t in self._stopwords:
                    continue
                tokens.append(t)
            return tokens
    
        def _generate_wordcloud(self, frequencies: dict) -> Image:
            from io import BytesIO
            import matplotlib.pyplot as plt
            from wordcloud import WordCloud
            try:
                # Create figure
                fig, ax = plt.subplots(figsize=(10, 5))
    
                # Generate WordCloud
                wc = WordCloud(width=800, height=400, background_color="white") \
                    .generate_from_frequencies(frequencies)
    
                # Draw
                ax.imshow(wc, interpolation='bilinear')
                ax.axis("off")
                ax.set_title(f"WordCloud for Chapter {self._chapter_id}", fontsize=18, pad=20)
    
                # Save to in-memory bytes
                buf = BytesIO()
                fig.savefig(buf, format="png", bbox_inches="tight")
                buf.seek(0)
    
                return Image(data=buf.getvalue(), format="PNG")
            except Exception as e:
                # You may log or re-raise here
                raise
            finally:
                # Cleanup: ALWAYS close the figure
                if fig is not None:
                    plt.close(fig)
    
        def finish(self) -> Image:
            import math
            if not self._texts:
                return None
    
            # 1) tokenize all docs
            docs = [self._tokenize(txt) for txt in self._texts]
            num_docs = len(docs)
    
            # 2) document frequency
            df = {}
            for tokens in docs:
                for tok in set(tokens):
                    df[tok] = df.get(tok, 0) + 1
    
            # 3) tf-idf per doc → aggregate
            tfidf_sums = {}
            tfidf_counts = {}
    
            for tokens in docs:
                if not tokens:
                    continue
                # term freq in this doc
                tf = {}
                for tok in tokens:
                    tf[tok] = tf.get(tok, 0) + 1
    
                doc_len = len(tokens)
                for tok, f in tf.items():
                    tf_val = f / doc_len
                    # idf: log(N/df) + 1
                    idf_val = math.log(num_docs / df[tok]) + 1.0
                    tfidf = tf_val * idf_val
    
                    tfidf_sums[tok] = tfidf_sums.get(tok, 0.0) + tfidf
                    tfidf_counts[tok] = tfidf_counts.get(tok, 0) + 1
    
            # 4) average tf-idf per term
            avg_tfidf = {
                tok: (tfidf_sums[tok] / tfidf_counts[tok])
                for tok in tfidf_sums
            }
    
            # 5) sort & top-N
            sorted_terms = sorted(
                avg_tfidf.items(),
                key=lambda kv: kv[1],
                reverse=True
            )[: self._top_n]
    
            out = {word: float(score) for word, score in sorted_terms}
            return self._generate_wordcloud(out)

  4. 建立远程连接,这里用到外部包huawei-fabricsql-connectorapi:

    from contextlib import contextmanager
    from fabric_data.multimodal import ai_lake
    import os
    
    @contextmanager
    def create_connect():
        conn = ai_lake.connect(
            fabric_endpoint="172.16.31.57:60001",
            fabric_endpoint_id="8a708bbf-f862-4c53-9622-4c59b44524a8",
            fabric_workspace_id="ca319048-b07c-498c-97df-8f20b8ce0a52",
            lf_catalog_name="mini_kernel2",
            lf_instance_id="e662709e-6223-44a8-b254-d84cf71842de",
            access_key=os.getenv("access_key"),
            secret_key=os.getenv("secret_key"),
            default_database="multimodal_lake",
            use_single_cn_mode=True,
        )
        print(f"Establish ai_lake.connect: {conn}")
        try:
            yield conn
        finally:
            # release resource
            conn.close()

  5. 准备Parquet表:

    with create_connect() as conn:
        from fabric_data.ibis.backends.sql.datatype import advance_dtype
        conn.drop_table("librispeech_asr", database="multimodal_lake")
        table = conn.create_table(
            "librispeech_asr",
            schema={
                "file": str,
                "audio": advance_dtype(Audio),
                "text": str,
                "speaker_id": int,
                "chapter_id": int,
                "id": str,
            },
            table_format="parquet",
            external=True,
            if_not_exists=True,
            location="obs://mini-kernel/mini_kernel2/multimodal-test/librispeech_asr",
            database="multimodal_lake",
        )
        print(table)

  6. 注册Scalar UDF,UDAF:

    with create_connect() as conn:
        conn.set_function_staging_workspace(
            obs_server=os.getenv("obs_server"),
            obs_bucket_name="mini-kernel",
            obs_directory_base="mini_kernel2/multimodal-test",
            access_key=os.getenv("access_key"),
            secret_key=os.getenv("secret_key"),
        )
        print("finish set_function_staging_workspace")
    
        conn.delete_function("SpeechRecognitionUDF", database="multimodal_lake")
        conn.create_scalar_function(
            SpeechRecognitionUDF,
            database="multimodal_lake",
            packages=["transformers", "torch"],
            imports=["./facebook_wav2vec2_base_960h"]
        )
    
        conn.delete_function("SpeakerWordsUDAF", database="multimodal_lake")
        conn.create_agg_function(
            SpeakerWordsUDAF,
            database="multimodal_lake",
            packages=["wordcloud", "matplotlib"]
        )

  7. 以Fabric Data API形式使用Scalar UDF, UDAF,得到最后的分组统计词云图片:

    with create_connect() as conn:
        # load dataset
        ds = conn.load_dataset("librispeech_asr", database="multimodal_lake")
        ds.show(limit=1)
        ds = ds.limit(1)
    
        # use Scalar UDF for ASR
        udf = conn.get_function("SpeechRecognitionUDF", database="multimodal_lake")
    
        ds = ds.map_batches(
            fn=udf,
            on=[ds.audio],
            as_col="hyp_text",
            num_dpus=0.5,
            concurrency=1,
            model_path="./fabric_data/examples/facebook_wav2vec2_base_960h",
        )
        ds.show(limit=1)
    
        # use UDAF for summary
        udaf = conn.get_function("SpeakerWordsUDAF", database="multimodal_lake")
    
        from fabric_data.multimodal.function import AggregateFnBuilder
        udaf_builder = AggregateFnBuilder(
            fn=udaf,
            on=[ds.chapter_id, ds.hyp_text],
            as_col="tf_idf_image",
            num_dpus=0.5,
        )
    
        summary = ds.aggregate(
            udaf_builder,
            by=ds.chapter_id,
        )
        summary.show(limit=1)
    
        df = summary.execute()
        print(df)
        display_image(df["tf_idf_image"].tolist())

相关文档