更新时间:2025-07-30 GMT+08:00
算子开发典型样例
样例1:音频时长过滤算子
算子包目录结构:
manifest.yml文件内容:
id: audio_duration_filter name: 音频时长过滤 description: 保留音频时长在指定访问内的样本。 author: "Huawei Cloud/EI" tags: language: - zh format: - wav - mp3 category: 数据过滤 modal: - AUDIO runtime: cpu-arch: - ARM resources: - cpu: 4 memory: 8192 environment: PYTHON entrypoint: process.py auto-data-loading: true arguments: - key: duration_range name: 保留时长(秒) type: int between: true min: 0 tips: 保留时长在指定访问的的音频文件。 required: true visible: true default: 5;3600
process.py文件内容:
import os import numpy as np import pandas as pd from pydub import AudioSegment import argparse import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def get_audio_duration(file_path): try: audio = AudioSegment.from_file(file_path) duration = len(audio) / 1000 # 转换为秒 except: duration = None return duration class Process: def __init__(self, args: argparse.Namespace): logger.info(f'self.args is {args},type is {type(args)}') min_duration, max_duration = args.duration_range.split(';') self.min_duration = int(min_duration) if min_duration else 0 self.max_duration = int(max_duration) if max_duration else np.inf def __call__(self, df: pd.DataFrame) -> pd.DataFrame: duration_list = [] filter_list = [] for row in df.itertuples(): duration = get_audio_duration(row.file_path) duration_list.append(duration) filter_list.append(0) df['duration'] = duration_list # 根据duration筛选文件,0表示保留,1表示滤除。滤除动作由上层算子框架执行。 df['filter'] = np.where((df['duration'] >= self.min_duration) & (df['duration'] <= self.max_duration), 0, 1) return df
requirements.txt文件内容:
pydub
样例2:提取文本中的JSON对象
算子包目录结构:
manifest.yml文件内容:
id: extract_json_data name: 提取文本中的JSON对象 description: 提取文本中的JSON对象 author: "Huawei Cloud/EI" version: 1.0.0 tags: language: - zh format: - jsonl - txt - json category: 数据转换 modal: - TEXT - OTHER runtime: cpu-arch: - ARM resources: - cpu: 4 memory: 8192 environment: PYTHON entrypoint: process.py auto-data-loading: false
process.py文件内容:
import os import pandas as pd import argparse import json import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) from moxing.file import list_directory, copy def extract_json_data(input_file, output_file): """ 从输入文件中提取正文JSON数据,并写入输出文件(JSONL格式) :param input_file: 输入文件名 :param output_file: 输出文件名 """ with open(input_file, 'r', encoding='utf-8') as fin, \ open(output_file, 'w', encoding='utf-8') as fout: for line in fin: line = line.strip() if not line: continue # 使用分隔符" : "分割每行数据(最多分割一次) parts = line.split(' : ', 1) if len(parts) < 2: print(f"跳过无效行: {line}") continue json_str = parts[1].strip() try: # 尝试直接解析JSON json_data = json.loads(json_str) except json.JSONDecodeError: try: # 替换常见中文引号后重试解析 fixed_str = json_str.replace('“', '"').replace('”', '"') json_data = json.loads(fixed_str) except: print(f"解析失败,跳过: {json_str}") continue # 将有效的JSON对象写入输出文件(JSONL格式) fout.write(json.dumps(json_data, ensure_ascii=False) + '\n') class Process: def __init__(self, args): # 建本地目录,用于存放从obs上下载的文件 self.input_dir = r'input_dir' os.makedirs(self.input_dir, exist_ok=True) # 建本地目录,用于存放算子生成的jsonl文件 self.output_dir = r'output_dir' os.makedirs(self.output_dir, exist_ok=True) self.args = args logger.info(f'self.args is {self.args},type is {type(self.args)}') def __call__(self, input): input_obs_path = self.args.obs_input_path output_obs_path = self.args.obs_output_path file_name_list = list_directory(input_obs_path, recursive=False) file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list} # 数据处理 for file_name, obs_file_path in file_path_dict.items(): input_file = os.path.join(self.input_dir, file_name) output_file = os.path.join(self.output_dir, file_name) copy(obs_file_path, input_file) extract_json_data(input_file, output_file) copy(output_file, os.path.join(output_obs_path, file_name))
父主题: 自定义数据集加工算子