Help Center/
PanguLargeModels/
User Guide/
Using Data Engineering to Create a Dataset/
Processing Datasets/
Managing Processing Operators/
Custom Data Processing Operators/
Typical Operator Development Examples
Updated on 2025-11-04 GMT+08:00
Typical Operator Development Examples
Example 1: Audio Duration Filter Operator
Directory structure of the operator package:

manifest.yml file content:
id: audio_duration_filter
name: audio duration filtering
description: Retains samples whose audio duration is within the specified access duration.
author: "Huawei Cloud/EI"
tags:
language:
- zh
format:
- wav
- mp3
category: data filtering
modal:
- AUDIO
runtime:
cpu-arch:
- ARM
resources:
- cpu: 4
memory: 8192
environment: PYTHON
entrypoint: process.py
auto-data-loading: true
arguments:
- key: duration_range
name: retention duration (s)
type: int
between: true
min: 0
tips: Retains audio files whose duration is within the specified access duration.
required: true
visible: true
default: 5;3600
process.py file content:
import os
import numpy as np
import pandas as pd
from pydub import AudioSegment
import argparse
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_audio_duration(file_path):
try:
audio = AudioSegment.from_file(file_path)
duration = len (audio) / 1000 # Convert the value to seconds.
except:
duration = None
return duration
class Process:
def __init__(self, args: argparse.Namespace):
logger.info(f'self.args is {args},type is {type(args)}')
min_duration, max_duration = args.duration_range.split(';')
self.min_duration = int(min_duration) if min_duration else 0
self.max_duration = int(max_duration) if max_duration else np.inf
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
duration_list = []
filter_list = []
for row in df.itertuples():
duration = get_audio_duration(row.file_path)
duration_list.append(duration)
filter_list.append(0)
df['duration'] = duration_list
# Filter files by duration. The value 0 indicates that files are reserved, and the value 1 indicates that files are filtered out. The filtering action is executed by the upper-layer operator framework.
df['filter'] = np.where((df['duration'] >= self.min_duration)
& (df['duration'] <= self.max_duration), 0, 1)
return df
requirements.txt file content:
pydub
Example 2: Extracting JSON Objects from Text
Directory structure of the operator package:

manifest.yml file content:
id: extract_json_data
name: Extract JSON Objects from Text
description: Extracts a JSON object from a text.
author: "Huawei Cloud/EI"
version: 1.0.0
tags:
language:
- zh
format:
- jsonl
- txt
- json
category: Data conversion
modal:
- TEXT
- OTHER
runtime:
cpu-arch:
- ARM
resources:
- cpu: 4
memory: 8192
environment: PYTHON
entrypoint: process.py
auto-data-loading: false
process.py file content:
import os
import pandas as pd
import argparse
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from moxing.file import list_directory, copy
def extract_json_data(input_file, output_file):
"""
Extracts JSON data from the input file and writes the data to the output file (JSONL format).
:param input_file: input file name
:param output_file: output file name
"""
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
line = line.strip()
if not line:
continue
# Use a colon (:) to separate each line of data (at most once).
parts = line.split(' : ', 1)
if len(parts) < 2:
print(f"Skip invalid lines: {line}")
continue
json_str = parts[1].strip()
try:
# Try to directly parse the JSON file.
json_data = json.loads(json_str)
except json.JSONDecodeError:
try:
# Replace common Chinese quotation marks and try again.
fixed_str = json_str.replace('"', '"').replace('"', '"')
json_data = json.loads(fixed_str)
except:
print (f"Parsing failed. Skip: {json_str}")
continue
# Write a valid JSON object to the output file (JSONL format).
fout.write(json.dumps(json_data, ensure_ascii=False) + '\n')
class Process:
def __init__(self, args):
# Create a local directory for storing files downloaded from OBS.
self.input_dir = r'input_dir'
os.makedirs(self.input_dir, exist_ok=True)
# Create a local directory to store the JSONL file generated by the operator.
self.output_dir = r'output_dir'
os.makedirs(self.output_dir, exist_ok=True)
self.args = args
logger.info(f'self.args is {self.args},type is {type(self.args)}')
def __call__(self, input):
input_obs_path = self.args.obs_input_path
output_obs_path = self.args.obs_output_path
file_name_list = list_directory(input_obs_path, recursive=False)
file_path_dict = {file_name: os.path.join(input_obs_path, file_name) for file_name in file_name_list}
# Data processing
for file_name, obs_file_path in file_path_dict.items():
input_file = os.path.join(self.input_dir, file_name)
output_file = os.path.join(self.output_dir, file_name)
copy(obs_file_path, input_file)
extract_json_data(input_file, output_file)
copy(output_file, os.path.join(output_obs_path, file_name))
Parent topic: Custom Data Processing Operators
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot