更新时间:2025-11-04 GMT+08:00
算子开发典型样例
样例1:音频时长过滤算子
算子包目录结构:

manifest.yml文件内容:
id: audio_duration_filter
name: 音频时长过滤
description: 保留音频时长在指定访问内的样本。
author: "Huawei Cloud/EI"
tags:
language:
- zh
format:
- wav
- mp3
category: 数据过滤
modal:
- AUDIO
runtime:
cpu-arch:
- ARM
resources:
- cpu: 4
memory: 8192
environment: PYTHON
entrypoint: process.py
auto-data-loading: true
arguments:
- key: duration_range
name: 保留时长(秒)
type: int
between: true
min: 0
tips: 保留时长在指定访问的的音频文件。
required: true
visible: true
default: 5;3600
process.py文件内容:
import os
import numpy as np
import pandas as pd
from pydub import AudioSegment
import argparse
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_audio_duration(file_path):
try:
audio = AudioSegment.from_file(file_path)
duration = len(audio) / 1000 # 转换为秒
except:
duration = None
return duration
class Process:
def __init__(self, args: argparse.Namespace):
logger.info(f'self.args is {args},type is {type(args)}')
min_duration, max_duration = args.duration_range.split(';')
self.min_duration = int(min_duration) if min_duration else 0
self.max_duration = int(max_duration) if max_duration else np.inf
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
duration_list = []
filter_list = []
for row in df.itertuples():
duration = get_audio_duration(row.file_path)
duration_list.append(duration)
filter_list.append(0)
df['duration'] = duration_list
# 根据duration筛选文件,0表示保留,1表示滤除。滤除动作由上层算子框架执行。
df['filter'] = np.where((df['duration'] >= self.min_duration)
& (df['duration'] <= self.max_duration), 0, 1)
return df
requirements.txt文件内容:
pydub
样例2:提取文本中的JSON对象
算子包目录结构:

manifest.yml文件内容:
id: extract_json_data
name: 提取文本中的JSON对象
description: 提取文本中的JSON对象
author: "Huawei Cloud/EI"
version: 1.0.0
tags:
language:
- zh
format:
- jsonl
- txt
- json
category: 数据转换
modal:
- TEXT
- OTHER
runtime:
cpu-arch:
- ARM
resources:
- cpu: 4
memory: 8192
environment: PYTHON
entrypoint: process.py
auto-data-loading: false
process.py文件内容:
import os
import pandas as pd
import argparse
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from moxing.file import list_directory, copy
def extract_json_data(input_file, output_file):
"""
从输入文件中提取正文JSON数据,并写入输出文件(JSONL格式)
:param input_file: 输入文件名
:param output_file: 输出文件名
"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
line = line.strip()
if not line:
continue
# 使用分隔符" : "分割每行数据(最多分割一次)
parts = line.split(' : ', 1)
if len(parts) < 2:
print(f"跳过无效行: {line}")
continue
json_str = parts[1].strip()
try:
# 尝试直接解析JSON
json_data = json.loads(json_str)
except json.JSONDecodeError:
try:
# 替换常见中文引号后重试解析
fixed_str = json_str.replace('“', '"').replace('”', '"')
json_data = json.loads(fixed_str)
except:
print(f"解析失败,跳过: {json_str}")
continue
# 将有效的JSON对象写入输出文件(JSONL格式)
fout.write(json.dumps(json_data, ensure_ascii=False) + '\n')
class Process:
def __init__(self, args):
# 建本地目录,用于存放从obs上下载的文件
self.input_dir = r'input_dir'
os.makedirs(self.input_dir, exist_ok=True)
# 建本地目录,用于存放算子生成的jsonl文件
self.output_dir = r'output_dir'
os.makedirs(self.output_dir, exist_ok=True)
self.args = args
logger.info(f'self.args is {self.args},type is {type(self.args)}')
def __call__(self, input):
input_obs_path = self.args.obs_input_path
output_obs_path = self.args.obs_output_path
file_name_list = list_directory(input_obs_path, recursive=False)
file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list}
# 数据处理
for file_name, obs_file_path in file_path_dict.items():
input_file = os.path.join(self.input_dir, file_name)
output_file = os.path.join(self.output_dir, file_name)
copy(obs_file_path, input_file)
extract_json_data(input_file, output_file)
copy(output_file, os.path.join(output_obs_path, file_name))
父主题: 自定义数据集加工算子