Help Center/
DataArts Fabric/
Developer Guide/
Multimodal AI Data Lake/
Multimodal Data Processing/
Best Practices/
Speech Recognition Case Study
Updated on 2025-12-19 GMT+08:00
Speech Recognition Case Study
This case study demonstrates how to define a vectorized scalar UDF for speech recognition and a UDAF for aggregation statistics and visualization.
- Import high-level data types (images, audio, video) from fabric_data.
from fabric_data.multimodal.types.image import Image, display_image from fabric_data.multimodal.types.audio import Audio from fabric_data.multimodal.types.vectorized import PandasVector
- Define a vectorized scalar UDF class for speech recognition using external packages like transformers and torch.
class SpeechRecognitionUDF: def __init__(self, model_path: str) -> None: import os from transformers import pipeline import torch model_dir = os.path.abspath(model_path) self._asr = pipeline( "automatic-speech-recognition", model=model_dir, device=0 if torch.cuda.is_available() else -1, ) def __call__(self, audios: PandasVector[Audio]) -> PandasVector[str]: return PandasVector[str](audios.map( lambda audio: self._asr(audio.sound_file.read(dtype="float32"))["text"] )) - Define a UDAF for TF-IDF word frequency analysis and word cloud generation using external packages such as wordcloud and matplotlib.
class SpeakerWordsUDAF: def __init__(self, top_n: int = 40): from typing import List self._texts: List[str] = [] self._top_n = top_n self._chapter_id = None # tiny built-in stopword list (extend as needed) self._stopwords = { "the", "a", "an", "and", "or", "but", "also", "about", "this", "that", "these", "those", "his", "her", "its", "their", "our", "your", "of", "to", "in", "on", "for", "at", "by", "with", "from", "as", "is", "are", "was", "were", "be", "been", "it", "he", "she", "they", "we", "you", "i", "not", "do", "did", "does", "had", "have", "has", "how", "what", "when", "where", "which", "who", "whom", "would", "could", "should", "will", "can", "may", } @property def aggregate_state(self): return { "texts": self._texts, "chapter_id": self._chapter_id, } def accumulate(self, chapter_id: int, hyp_text: str): if hyp_text: self._texts.append(hyp_text) if chapter_id: self._chapter_id = chapter_id def merge(self, other_state): if other_texts := other_state.get("texts"): self._texts.extend(other_texts) if other_chapterid := other_state.get("chapter_id"): if self._chapter_id: # for one SpeakerWordsUDAF instance, it should process texts with same chapter_id assert self._chapter_id == other_chapterid else: # CN get chapter_id from DN self._chapter_id = other_chapterid def _tokenize(self, text: str): # simple whitespace tokenizer + basic cleaning tokens = [] for t in text.split(): t = t.strip().lower() # drop short tokens and non-alpha tokens if len(t) < 3: continue if not t.isalpha(): continue if t in self._stopwords: continue tokens.append(t) return tokens def _generate_wordcloud(self, frequencies: dict) -> Image: from io import BytesIO import matplotlib.pyplot as plt from wordcloud import WordCloud try: # Create figure fig, ax = plt.subplots(figsize=(10, 5)) # Generate WordCloud wc = WordCloud(width=800, height=400, background_color="white") \ .generate_from_frequencies(frequencies) # Draw ax.imshow(wc, interpolation='bilinear') ax.axis("off") ax.set_title(f"WordCloud for Chapter {self._chapter_id}", fontsize=18, pad=20) # Save to in-memory bytes buf = BytesIO() fig.savefig(buf, format="png", bbox_inches="tight") buf.seek(0) return Image(data=buf.getvalue(), format="PNG") except Exception as e: # You may log or re-raise here raise finally: # Cleanup: ALWAYS close the figure if fig is not None: plt.close(fig) def finish(self) -> Image: import math if not self._texts: return None # 1) tokenize all docs docs = [self._tokenize(txt) for txt in self._texts] num_docs = len(docs) # 2) document frequency df = {} for tokens in docs: for tok in set(tokens): df[tok] = df.get(tok, 0) + 1 # 3) tf-idf per doc → aggregate tfidf_sums = {} tfidf_counts = {} for tokens in docs: if not tokens: continue # term freq in this doc tf = {} for tok in tokens: tf[tok] = tf.get(tok, 0) + 1 doc_len = len(tokens) for tok, f in tf.items(): tf_val = f / doc_len # idf: log(N/df) + 1 idf_val = math.log(num_docs / df[tok]) + 1.0 tfidf = tf_val * idf_val tfidf_sums[tok] = tfidf_sums.get(tok, 0.0) + tfidf tfidf_counts[tok] = tfidf_counts.get(tok, 0) + 1 # 4) average tf-idf per term avg_tfidf = { tok: (tfidf_sums[tok] / tfidf_counts[tok]) for tok in tfidf_sums } # 5) sort & top-N sorted_terms = sorted( avg_tfidf.items(), key=lambda kv: kv[1], reverse=True )[: self._top_n] out = {word: float(score) for word, score in sorted_terms} return self._generate_wordcloud(out) - Establish a remote connection using the huawei-fabricsql-connectorapi package.
from contextlib import contextmanager from fabric_data.multimodal import ai_lake import os @contextmanager def create_connect(): conn = ai_lake.connect( fabric_endpoint="172.16.31.57:60001", fabric_endpoint_id="8a708bbf-f862-4c53-9622-4c59b44524a8", fabric_workspace_id="ca319048-b07c-498c-97df-8f20b8ce0a52", lf_catalog_name="mini_kernel2", lf_instance_id="e662709e-6223-44a8-b254-d84cf71842de", access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), default_database="multimodal_lake", use_single_cn_mode=True, ) print(f"Establish ai_lake.connect: {conn}") try: yield conn finally: # release resource conn.close() - Prepare a Parquet table.
with create_connect() as conn: from fabric_data.ibis.backends.sql.datatype import advance_dtype conn.drop_table("librispeech_asr", database="multimodal_lake") table = conn.create_table( "librispeech_asr", schema={ "file": str, "audio": advance_dtype(Audio), "text": str, "speaker_id": int, "chapter_id": int, "id": str, }, table_format="parquet", external=True, if_not_exists=True, location="obs://mini-kernel/mini_kernel2/multimodal-test/librispeech_asr", database="multimodal_lake", ) print(table) - Register the scalar UDF and UDAF.
with create_connect() as conn: conn.set_function_staging_workspace( obs_server=os.getenv("obs_server"), obs_bucket_name="mini-kernel", obs_directory_base="mini_kernel2/multimodal-test", access_key=os.getenv("access_key"), secret_key=os.getenv("secret_key"), ) print("finish set_function_staging_workspace") conn.delete_function("SpeechRecognitionUDF", database="multimodal_lake") conn.create_scalar_function( SpeechRecognitionUDF, database="multimodal_lake", packages=["transformers", "torch"], imports=["./facebook_wav2vec2_base_960h"] ) conn.delete_function("SpeakerWordsUDAF", database="multimodal_lake") conn.create_agg_function( SpeakerWordsUDAF, database="multimodal_lake", packages=["wordcloud", "matplotlib"] ) - Use the scalar UDF and UDAF via Fabric Data API to generate grouped statistical word cloud images.
with create_connect() as conn: # load dataset ds = conn.load_dataset("librispeech_asr", database="multimodal_lake") ds.show(limit=1) ds = ds.limit(1) # use Scalar UDF for ASR udf = conn.get_function("SpeechRecognitionUDF", database="multimodal_lake") ds = ds.map_batches( fn=udf, on=[ds.audio], as_col="hyp_text", num_dpus=0.5, concurrency=1, model_path="./fabric_data/examples/facebook_wav2vec2_base_960h", ) ds.show(limit=1) # use UDAF for summary udaf = conn.get_function("SpeakerWordsUDAF", database="multimodal_lake") from fabric_data.multimodal.function import AggregateFnBuilder udaf_builder = AggregateFnBuilder( fn=udaf, on=[ds.chapter_id, ds.hyp_text], as_col="tf_idf_image", num_dpus=0.5, ) summary = ds.aggregate( udaf_builder, by=ds.chapter_id, ) summary.show(limit=1) df = summary.execute() print(df) display_image(df["tf_idf_image"].tolist())
Parent topic: Best Practices
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot