Updated on 2025-11-20 GMT+08:00

Typical Operator Development Examples

Example 1: Audio Duration Filter Operator

Directory structure of the operator package:

manifest.yml file content:

id: audio_duration_filtername: audio duration filteringdescription: Retains samples whose audio duration is within the specified access duration.author: "Huawei Cloud/EI"tags:  language:    - zh  format:    - wav    - mp3  category: data filtering  modal:    - AUDIOruntime:  cpu-arch:    - ARM  resources:    - cpu: 4      memory: 8192  environment: PYTHON  entrypoint: process.py  auto-data-loading: truearguments:  - key: duration_range    name: retention duration (s)    type: int    between: true    min: 0    tips: Retains audio files whose duration is within the specified access duration.    required: true    visible: true    default: 5;3600

process.py file content:

import osimport numpy as npimport pandas as pdfrom pydub import AudioSegmentimport argparseimport logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)def get_audio_duration(file_path):    try:        audio = AudioSegment.from_file(file_path)        duration = len (audio) / 1000 # Convert the value to seconds.    except:        duration = None    return durationclass Process:    def __init__(self, args: argparse.Namespace):        logger.info(f'self.args is {args},type is {type(args)}')        min_duration, max_duration = args.duration_range.split(';')        self.min_duration = int(min_duration) if min_duration else 0        self.max_duration = int(max_duration) if max_duration else np.inf    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:        duration_list = []        filter_list = []        for row in df.itertuples():            duration = get_audio_duration(row.file_path)            duration_list.append(duration)            filter_list.append(0)        df['duration'] = duration_list        # Filter files by duration. The value 0 indicates that files are reserved, and the value 1 indicates that files are filtered out. The filtering action is executed by the upper-layer operator framework.        df['filter'] = np.where((df['duration'] >= self.min_duration)                                & (df['duration'] <= self.max_duration), 0, 1)        return df

requirements.txt file content:

pydub

Example 2: Extracting JSON Objects from Text

Directory structure of the operator package:

manifest.yml file content:

id: extract_json_dataname: Extract JSON Objects from Textdescription: Extracts a JSON object from a text.author: "Huawei Cloud/EI"version: 1.0.0tags:  language:    - zh  format:    - jsonl    - txt    - json  category: Data conversion  modal:    - TEXT    - OTHERruntime:  cpu-arch:    - ARM  resources:    - cpu: 4      memory: 8192  environment: PYTHON  entrypoint: process.py  auto-data-loading: false

process.py file content:

import osimport pandas as pdimport argparseimport jsonimport logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)from moxing.file import list_directory, copydef extract_json_data(input_file, output_file):    """    Extracts JSON data from the input file and writes the data to the output file (JSONL format).     :param input_file: input file name    :param output_file: output file name    """    with open(input_file, 'r', encoding='utf-8') as fin, \            open(output_file, 'w', encoding='utf-8') as fout:        for line in fin:            line = line.strip()            if not line:                continue            # Use a colon (:) to separate each line of data (at most once).            parts = line.split(' : ', 1)            if len(parts) < 2:                print(f"Skip invalid lines: {line}")                continue            json_str = parts[1].strip()            try:                # Try to directly parse the JSON file.                json_data = json.loads(json_str)            except json.JSONDecodeError:                try:                    # Replace common Chinese quotation marks and try again.                    fixed_str = json_str.replace('"', '"').replace('"', '"')                    json_data = json.loads(fixed_str)                except:                    print (f"Parsing failed. Skip: {json_str}")                    continue            # Write a valid JSON object to the output file (JSONL format).            fout.write(json.dumps(json_data, ensure_ascii=False) + '\n')class Process:    def __init__(self, args):        # Create a local directory for storing files downloaded from OBS.        self.input_dir = r'input_dir'        os.makedirs(self.input_dir, exist_ok=True)        # Create a local directory to store the JSONL file generated by the operator.        self.output_dir = r'output_dir'        os.makedirs(self.output_dir, exist_ok=True)        self.args = args        logger.info(f'self.args is {self.args},type is {type(self.args)}')    def __call__(self, input):        input_obs_path = self.args.obs_input_path        output_obs_path = self.args.obs_output_path        file_name_list = list_directory(input_obs_path, recursive=False)        file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list}       # Data processing        for file_name, obs_file_path in file_path_dict.items():            input_file = os.path.join(self.input_dir, file_name)            output_file = os.path.join(self.output_dir, file_name)            copy(obs_file_path, input_file)            extract_json_data(input_file, output_file)            copy(output_file, os.path.join(output_obs_path, file_name))