Updated on 2025-11-04 GMT+08:00

Typical Operator Development Examples

Example 1: Audio Duration Filter Operator

Directory structure of the operator package:

manifest.yml file content:

id: audio_duration_filter
name: audio duration filtering
description: Retains samples whose audio duration is within the specified access duration.
author: "Huawei Cloud/EI"

tags:
  language:
    - zh
  format:
    - wav
    - mp3
  category: data filtering
  modal:
    - AUDIO
runtime:
  cpu-arch:
    - ARM
  resources:
    - cpu: 4
      memory: 8192
  environment: PYTHON
  entrypoint: process.py
  auto-data-loading: true
arguments:
  - key: duration_range
    name: retention duration (s)
    type: int
    between: true
    min: 0
    tips: Retains audio files whose duration is within the specified access duration.
    required: true
    visible: true
    default: 5;3600

process.py file content:

import os
import numpy as np
import pandas as pd
from pydub import AudioSegment
import argparse
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_audio_duration(file_path):
    try:
        audio = AudioSegment.from_file(file_path)
        duration = len (audio) / 1000 # Convert the value to seconds.
    except:
        duration = None
    return duration


class Process:
    def __init__(self, args: argparse.Namespace):
        logger.info(f'self.args is {args},type is {type(args)}')
        min_duration, max_duration = args.duration_range.split(';')
        self.min_duration = int(min_duration) if min_duration else 0
        self.max_duration = int(max_duration) if max_duration else np.inf
    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        duration_list = []
        filter_list = []
        for row in df.itertuples():
            duration = get_audio_duration(row.file_path)
            duration_list.append(duration)
            filter_list.append(0)
        df['duration'] = duration_list

        # Filter files by duration. The value 0 indicates that files are reserved, and the value 1 indicates that files are filtered out. The filtering action is executed by the upper-layer operator framework.
        df['filter'] = np.where((df['duration'] >= self.min_duration)
                                & (df['duration'] <= self.max_duration), 0, 1)
        return df

requirements.txt file content:

pydub

Example 2: Extracting JSON Objects from Text

Directory structure of the operator package:

manifest.yml file content:

id: extract_json_data
name: Extract JSON Objects from Text
description: Extracts a JSON object from a text.
author: "Huawei Cloud/EI"
version: 1.0.0
tags:
  language:
    - zh
  format:
    - jsonl
    - txt
    - json
  category: Data conversion
  modal:
    - TEXT
    - OTHER
runtime:
  cpu-arch:
    - ARM
  resources:
    - cpu: 4
      memory: 8192
  environment: PYTHON
  entrypoint: process.py
  auto-data-loading: false

process.py file content:

import os
import pandas as pd
import argparse
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from moxing.file import list_directory, copy
def extract_json_data(input_file, output_file):
    """
    Extracts JSON data from the input file and writes the data to the output file (JSONL format).
     :param input_file: input file name
    :param output_file: output file name
    """
    with open(input_file, 'r', encoding='utf-8') as fin, \
            open(output_file, 'w', encoding='utf-8') as fout:
        for line in fin:
            line = line.strip()
            if not line:
                continue

            # Use a colon (:) to separate each line of data (at most once).
            parts = line.split(' : ', 1)
            if len(parts) < 2:
                print(f"Skip invalid lines: {line}")
                continue

            json_str = parts[1].strip()
            try:
                # Try to directly parse the JSON file.
                json_data = json.loads(json_str)
            except json.JSONDecodeError:
                try:
                    # Replace common Chinese quotation marks and try again.
                    fixed_str = json_str.replace('"', '"').replace('"', '"')
                    json_data = json.loads(fixed_str)
                except:
                    print (f"Parsing failed. Skip: {json_str}")
                    continue

            # Write a valid JSON object to the output file (JSONL format).
            fout.write(json.dumps(json_data, ensure_ascii=False) + '\n')
class Process:
    def __init__(self, args):
        # Create a local directory for storing files downloaded from OBS.
        self.input_dir = r'input_dir'
        os.makedirs(self.input_dir, exist_ok=True)
        # Create a local directory to store the JSONL file generated by the operator.
        self.output_dir = r'output_dir'
        os.makedirs(self.output_dir, exist_ok=True)
        self.args = args
        logger.info(f'self.args is {self.args},type is {type(self.args)}')
    def __call__(self, input):
        input_obs_path = self.args.obs_input_path
        output_obs_path = self.args.obs_output_path
        file_name_list = list_directory(input_obs_path, recursive=False)
        file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list}
       # Data processing
        for file_name, obs_file_path in file_path_dict.items():
            input_file = os.path.join(self.input_dir, file_name)
            output_file = os.path.join(self.output_dir, file_name)
            copy(obs_file_path, input_file)
            extract_json_data(input_file, output_file)
            copy(output_file, os.path.join(output_obs_path, file_name))