更新时间:2025-07-30 GMT+08:00
分享

算子开发典型样例

样例1:音频时长过滤算子

算子包目录结构:

manifest.yml文件内容:

id: audio_duration_filter
name: 音频时长过滤
description: 保留音频时长在指定访问内的样本。
author: "Huawei Cloud/EI"

tags:
  language:
    - zh
  format:
    - wav
    - mp3
  category: 数据过滤
  modal:
    - AUDIO
runtime:
  cpu-arch:
    - ARM
  resources:
    - cpu: 4
      memory: 8192
  environment: PYTHON
  entrypoint: process.py
  auto-data-loading: true
arguments:
  - key: duration_range
    name: 保留时长(秒)
    type: int
    between: true
    min: 0
    tips: 保留时长在指定访问的的音频文件。
    required: true
    visible: true
    default: 5;3600

process.py文件内容:

import os
import numpy as np
import pandas as pd
from pydub import AudioSegment
import argparse
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_audio_duration(file_path):
    try:
        audio = AudioSegment.from_file(file_path)
        duration = len(audio) / 1000  # 转换为秒
    except:
        duration = None
    return duration


class Process:
    def __init__(self, args: argparse.Namespace):
        logger.info(f'self.args is {args},type is {type(args)}')
        min_duration, max_duration = args.duration_range.split(';')
        self.min_duration = int(min_duration) if min_duration else 0
        self.max_duration = int(max_duration) if max_duration else np.inf
    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        duration_list = []
        filter_list = []
        for row in df.itertuples():
            duration = get_audio_duration(row.file_path)
            duration_list.append(duration)
            filter_list.append(0)
        df['duration'] = duration_list

        # 根据duration筛选文件,0表示保留,1表示滤除。滤除动作由上层算子框架执行。
        df['filter'] = np.where((df['duration'] >= self.min_duration)
                                & (df['duration'] <= self.max_duration), 0, 1)
        return df

requirements.txt文件内容:

pydub

样例2:提取文本中的JSON对象

算子包目录结构:

manifest.yml文件内容:

id: extract_json_data
name: 提取文本中的JSON对象
description: 提取文本中的JSON对象
author: "Huawei Cloud/EI"
version: 1.0.0
tags:
  language:
    - zh
  format:
    - jsonl
    - txt
    - json
  category: 数据转换
  modal:
    - TEXT
    - OTHER
runtime:
  cpu-arch:
    - ARM
  resources:
    - cpu: 4
      memory: 8192
  environment: PYTHON
  entrypoint: process.py
  auto-data-loading: false

process.py文件内容:

import os
import pandas as pd
import argparse
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from moxing.file import list_directory, copy
def extract_json_data(input_file, output_file):
    """
    从输入文件中提取正文JSON数据,并写入输出文件(JSONL格式)
    :param input_file: 输入文件名
    :param output_file: 输出文件名
    """
    with open(input_file, 'r', encoding='utf-8') as fin, \
            open(output_file, 'w', encoding='utf-8') as fout:
        for line in fin:
            line = line.strip()
            if not line:
                continue

            # 使用分隔符" : "分割每行数据(最多分割一次)
            parts = line.split(' : ', 1)
            if len(parts) < 2:
                print(f"跳过无效行: {line}")
                continue

            json_str = parts[1].strip()
            try:
                # 尝试直接解析JSON
                json_data = json.loads(json_str)
            except json.JSONDecodeError:
                try:
                    # 替换常见中文引号后重试解析
                    fixed_str = json_str.replace('“', '"').replace('”', '"')
                    json_data = json.loads(fixed_str)
                except:
                    print(f"解析失败,跳过: {json_str}")
                    continue

            # 将有效的JSON对象写入输出文件(JSONL格式)
            fout.write(json.dumps(json_data, ensure_ascii=False) + '\n')
class Process:
    def __init__(self, args):
        # 建本地目录,用于存放从obs上下载的文件
        self.input_dir = r'input_dir'
        os.makedirs(self.input_dir, exist_ok=True)
        # 建本地目录,用于存放算子生成的jsonl文件
        self.output_dir = r'output_dir'
        os.makedirs(self.output_dir, exist_ok=True)
        self.args = args
        logger.info(f'self.args is {self.args},type is {type(self.args)}')
    def __call__(self, input):
        input_obs_path = self.args.obs_input_path
        output_obs_path = self.args.obs_output_path
        file_name_list = list_directory(input_obs_path, recursive=False)
        file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list}
        # 数据处理
        for file_name, obs_file_path in file_path_dict.items():
            input_file = os.path.join(self.input_dir, file_name)
            output_file = os.path.join(self.output_dir, file_name)
            copy(obs_file_path, input_file)
            extract_json_data(input_file, output_file)
            copy(output_file, os.path.join(output_obs_path, file_name))

相关文档