更新时间:2025-12-10 GMT+08:00
语音识别案例
本案例介绍如何定义Vectorized Scalar UDF来进行语音识别、如何定义UDAF来进行聚合统计和可视化。
- 从fabric_data中引入高阶类型(图片、音频、视频):
from fabric_data.multimodal.types.image import Image, display_image from fabric_data.multimodal.types.audio import Audio from fabric_data.multimodal.types.vectorized import PandasVector
- 定义Class Vectorized Scalar UDF来进行语音识别,这里使用到外部包transformers、torch:
class SpeechRecognitionUDF: def __init__(self, model_path: str) -> None: import os from transformers import pipeline import torch model_dir = os.path.abspath(model_path) self._asr = pipeline( "automatic-speech-recognition", model=model_dir, device=0 if torch.cuda.is_available() else -1, ) def __call__(self, audios: PandasVector[Audio]) -> PandasVector[str]: return PandasVector[str](audios.map( lambda audio: self._asr(audio.sound_file.read(dtype="float32"))["text"] )) - 定义UDAF来进行TF-IDF词频统计和词云图片生成,这里使用到外部包wordcloud、matplotlib:
class SpeakerWordsUDAF: def __init__(self, top_n: int = 40): from typing import List self._texts: List[str] = [] self._top_n = top_n self._chapter_id = None # tiny built-in stopword list (extend as needed) self._stopwords = { "the", "a", "an", "and", "or", "but", "also", "about", "this", "that", "these", "those", "his", "her", "its", "their", "our", "your", "of", "to", "in", "on", "for", "at", "by", "with", "from", "as", "is", "are", "was", "were", "be", "been", "it", "he", "she", "they", "we", "you", "i", "not", "do", "did", "does", "had", "have", "has", "how", "what", "when", "where", "which", "who", "whom", "would", "could", "should", "will", "can", "may", } @property def aggregate_state(self): return { "texts": self._texts, "chapter_id": self._chapter_id, } def accumulate(self, chapter_id: int, hyp_text: str): if hyp_text: self._texts.append(hyp_text) if chapter_id: self._chapter_id = chapter_id def merge(self, other_state): if other_texts := other_state.get("texts"): self._texts.extend(other_texts) if other_chapterid := other_state.get("chapter_id"): if self._chapter_id: # for one SpeakerWordsUDAF instance, it should process texts with same chapter_id assert self._chapter_id == other_chapterid else: # CN get chapter_id from DN self._chapter_id = other_chapterid def _tokenize(self, text: str): # simple whitespace tokenizer + basic cleaning tokens = [] for t in text.split(): t = t.strip().lower() # drop short tokens and non-alpha tokens if len(t) < 3: continue if not t.isalpha(): continue if t in self._stopwords: continue tokens.append(t) return tokens def _generate_wordcloud(self, frequencies: dict) -> Image: from io import BytesIO import matplotlib.pyplot as plt from wordcloud import WordCloud try: # Create figure fig, ax = plt.subplots(figsize=(10, 5)) # Generate WordCloud wc = WordCloud(width=800, height=400, background_color="white") \ .generate_from_frequencies(frequencies) # Draw ax.imshow(wc, interpolation='bilinear') ax.axis("off") ax.set_title(f"WordCloud for Chapter {self._chapter_id}", fontsize=18, pad=20) # Save to in-memory bytes buf = BytesIO() fig.savefig(buf, format="png", bbox_inches="tight") buf.seek(0) return Image(data=buf.getvalue(), format="PNG") except Exception as e: # You may log or re-raise here raise finally: # Cleanup: ALWAYS close the figure if fig is not None: plt.close(fig) def finish(self) -> Image: import math if not self._texts: return None # 1) tokenize all docs docs = [self._tokenize(txt) for txt in self._texts] num_docs = len(docs) # 2) document frequency df = {} for tokens in docs: for tok in set(tokens): df[tok] = df.get(tok, 0) + 1 # 3) tf-idf per doc → aggregate tfidf_sums = {} tfidf_counts = {} for tokens in docs: if not tokens: continue # term freq in this doc tf = {} for tok in tokens: tf[tok] = tf.get(tok, 0) + 1 doc_len = len(tokens) for tok, f in tf.items(): tf_val = f / doc_len # idf: log(N/df) + 1 idf_val = math.log(num_docs / df[tok]) + 1.0 tfidf = tf_val * idf_val tfidf_sums[tok] = tfidf_sums.get(tok, 0.0) + tfidf tfidf_counts[tok] = tfidf_counts.get(tok, 0) + 1 # 4) average tf-idf per term avg_tfidf = { tok: (tfidf_sums[tok] / tfidf_counts[tok]) for tok in tfidf_sums } # 5) sort & top-N sorted_terms = sorted( avg_tfidf.items(), key=lambda kv: kv[1], reverse=True )[: self._top_n] out = {word: float(score) for word, score in sorted_terms} return self._generate_wordcloud(out) - 建立远程连接,这里用到外部包huawei-fabricsql-connectorapi:
from contextlib import contextmanager from fabric_data.multimodal import ai_lake import os @contextmanager def create_connect(): conn = ai_lake.connect( fabric_endpoint="172.16.31.57:60001", fabric_endpoint_id="8a708bbf-f862-4c53-9622-4c59b44524a8", fabric_workspace_id="ca319048-b07c-498c-97df-8f20b8ce0a52", lf_catalog_name="mini_kernel2", lf_instance_id="e662709e-6223-44a8-b254-d84cf71842de", access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database="multimodal_lake", use_single_cn_mode=True, ) print(f"Establish ai_lake.connect: {conn}") try: yield conn finally: # release resource conn.close() - 准备Parquet表:
with create_connect() as conn: from fabric_data.ibis.backends.sql.datatype import advance_dtype conn.drop_table("librispeech_asr", database="multimodal_lake") table = conn.create_table( "librispeech_asr", schema={ "file": str, "audio": advance_dtype(Audio), "text": str, "speaker_id": int, "chapter_id": int, "id": str, }, table_format="parquet", external=True, if_not_exists=True, location="obs://mini-kernel/mini_kernel2/multimodal-test/librispeech_asr", database="multimodal_lake", ) print(table) - 注册Scalar UDF,UDAF:
with create_connect() as conn: conn.set_function_staging_workspace( obs_server=os.getenv("obs_server"), obs_bucket_name="mini-kernel", obs_directory_base="mini_kernel2/multimodal-test", access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), ) print("finish set_function_staging_workspace") conn.delete_function("SpeechRecognitionUDF", database="multimodal_lake") conn.create_scalar_function( SpeechRecognitionUDF, database="multimodal_lake", packages=["transformers", "torch"], imports=["./facebook_wav2vec2_base_960h"] ) conn.delete_function("SpeakerWordsUDAF", database="multimodal_lake") conn.create_agg_function( SpeakerWordsUDAF, database="multimodal_lake", packages=["wordcloud", "matplotlib"] ) - 以Fabric Data API形式使用Scalar UDF, UDAF,得到最后的分组统计词云图片:
with create_connect() as conn: # load dataset ds = conn.load_dataset("librispeech_asr", database="multimodal_lake") ds.show(limit=1) ds = ds.limit(1) # use Scalar UDF for ASR udf = conn.get_function("SpeechRecognitionUDF", database="multimodal_lake") ds = ds.map_batches( fn=udf, on=[ds.audio], as_col="hyp_text", num_dpus=0.5, concurrency=1, model_path="./fabric_data/examples/facebook_wav2vec2_base_960h", ) ds.show(limit=1) # use UDAF for summary udaf = conn.get_function("SpeakerWordsUDAF", database="multimodal_lake") from fabric_data.multimodal.function import AggregateFnBuilder udaf_builder = AggregateFnBuilder( fn=udaf, on=[ds.chapter_id, ds.hyp_text], as_col="tf_idf_image", num_dpus=0.5, ) summary = ds.aggregate( udaf_builder, by=ds.chapter_id, ) summary.show(limit=1) df = summary.execute() print(df) display_image(df["tf_idf_image"].tolist())
父主题: 最佳实践