Updated on 2025-07-02 GMT+08:00

Conversion Operators

The conversion operator construction requires a YML configuration file and the Python script TAR package for implementing functions.

1. YML file example:

### Python operator example

id: FormatConvert2
name: Custom operator format conversion
description: Format conversion using custom operators
author: Huawei Cloud Technology Co., Ltd.
tags:
  language:
    - zh
    - en
  format:
    - CSV
    - TIF
  category: Data conversion
  modal:
    - IMAGE
runtime:
  cpu-arch:
    - X86
  resources:
    - cpu: 1
      memory: 4096
  environment: python
  entrypoint: process.py
  auto-data-loading: false # Whether to automatically load data
arguments:
  - key: operateType
  name: Operation type
  type: ENUM  #Single choice
    items:
      - name: Object detection
        value: object_detection
      - name: Image classification
        value: image_classification
    required: true
    visible: true
    default: object_detection	
    """
    return "parquet"

2. TAR package:

Assume that the operator package name is operator-package.tar. The directory structure after the operator package is decompressed is as follows:

+---operator-package

| +--- program_package # Python operator directory

| | +---dependency

| | | +---requirements.txt # (Optional) Requirement description

| | +--- install.sh # (Optional) Installation script

| | +--- process.py # (Mandatory) Operator code

process.py file for format conversion

import PIL.Image
import PIL.ImageOps
import numpy as np
import ast
import os
import pandas as pd
import xml.etree.ElementTree as ET
from xml.dom import minidom
import PIL
import moxing as mox
import ma_utils as utils

logger = utils.FileLogger.get_logger()

import argparse


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--operateType", type=str, required=True, help="")
arg_parser.add_argument("--input_obs_path", type=str, required=True, help="")
arg_parser.add_argument("--output_obs_path", type=str, required=True, help="")


def prettify_xml(elem):
    rough_string = ET.tostring(elem, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    return reparsed.toprettyxml(indent="  ")


def create_pascal_voc_xml(folder, filename, path, width, height, depth, objects, deu, db, dp, ls, lv, lse, lc, srct,
                          srrt, src, srr, rct, rrt, rc, rr, dest__engine_uuid=None):
    annotation = ET.Element('annotation')
    # UUID of the destination engine
    dest__engine_uuid = ET.SubElement(annotation, 'DEST_ENGINE_UUID')
    dest__engine_uuid.text = deu
    # Destination bucket (logical partition)
    dest__engine_uuid = ET.SubElement(annotation, 'DEST_BUCKET')
    dest__engine_uuid.text = db
    # Destination path
    dest_path = ET.SubElement(annotation, 'DEST_PATH')
    dest_path.text = folder
    # File name
    filename_element = ET.SubElement(annotation, 'FILE_NAME')
    filename_element.text = filename
    # Full path of the label group
    dest_path = ET.SubElement(annotation, 'DEST_PATH')
    dest_path.text = dp
    # Full path of the label group
    label_value = ET.SubElement(annotation, 'LABEL_VALUE')
    label_value.text = lv
    # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour
    label_shape = ET.SubElement(annotation, 'LABEL_SHAPE')
    label_shape.text = ls
    # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour
    label_coordinate = ET.SubElement(annotation, 'LABEL_COORDINATE')
    label_coordinate.text = lc
    # Time when a source record is created
    rec_time = ET.SubElement(annotation, 'SOURCE_REC_CREATE_TIME')
    rec_time.text = srct
    # Time when a source record is modified
    rec_rev_time = ET.SubElement(annotation, 'SOURCE_REC_REVISE_TIME')
    rec_rev_time.text = srrt
    # Creator of the source record
    rec_ctor = ET.SubElement(annotation, 'SOURCE_REC_CREATOR')
    rec_ctor.text = src
    # Modifier of the source record
    rec_r_ctor = ET.SubElement(annotation, 'SOURCE_REC_REVISOR')
    rec_r_ctor.text = srr
    # Time when a source record is created
    d_cd_time = ET.SubElement(annotation, 'REC_CREATE_TIME')
    d_cd_time.text = rct
    # Time when a record is modified
    r_time = ET.SubElement(annotation, 'REC_REVISE_TIME')
    r_time.text = rrt
    # Record creator
    rec_c = ET.SubElement(annotation, 'REC_CREATOR')
    rec_c.text = rc
    # Record modifier
    recd_time = ET.SubElement(annotation, 'REC_REVISOR')
    recd_time.text = rr
    tree = ET.ElementTree(annotation)
    return tree
def create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects):
    annotation = ET.Element('annotation')

    folder_element = ET.SubElement(annotation, 'folder')
    folder_element.text = folder

    filename_element = ET.SubElement(annotation, 'filename')
    filename_element.text = filename

    path_element = ET.SubElement(annotation, 'path')
    path_element.text = path

    source = ET.SubElement(annotation, 'source')
    database = ET.SubElement(source, 'database')
    database.text = 'Unknown'

    size = ET.SubElement(annotation, 'size')
    width_element = ET.SubElement(size, 'width')
    width_element.text = str(width)
    height_element = ET.SubElement(size, 'height')
    height_element.text = str(height)
    depth_element = ET.SubElement(size, 'depth')
    depth_element.text = str(depth)

    segmented = ET.SubElement(annotation, 'segmented')
    segmented.text = '0'

    for obj in objects:
        obj_element = ET.SubElement(annotation, 'object')
        name_element = ET.SubElement(obj_element, 'name')
        name_element.text = obj['name']

        pose = ET.SubElement(obj_element, 'pose')
        pose.text = 'Unspecified'

        truncated = ET.SubElement(obj_element, 'truncated')
        truncated.text = '0'

        difficult = ET.SubElement(obj_element, 'difficult')
        difficult.text = '0'

        bndbox = ET.SubElement(obj_element, 'bndbox')
        xmin = ET.SubElement(bndbox, 'xmin')
        xmin.text = str(obj['xmin'])
        ymin = ET.SubElement(bndbox, 'ymin')
        ymin.text = str(obj['ymin'])
        xmax = ET.SubElement(bndbox, 'xmax')
        xmax.text = str(obj['xmax'])
        ymax = ET.SubElement(bndbox, 'ymax')
        ymax.text = str(obj['ymax'])

    tree = ET.ElementTree(annotation)
    return tree

def get_value(row, name):
    value = '' if pd.isna(row[name]) else str(row[name])
    return value


def copy_local_to_obs(local_file_path, obs_file_path):
    mox.file.copy(local_file_path, obs_file_path)


def csv_to_pascal_voc(args, csv_file, local_tif_file_path_dict, local_output_file_dir):
    operate_type = getattr(args, 'operateType', 'object_detection')
    logger.info(operate_type)
    df = pd.read_csv(csv_file)
    grouped = df.groupby('FILE_NAME')

    for filename, group in grouped:
        if local_tif_file_path_dict.get(filename, None):
            if operate_type == 'object_detection':

                objects = []
                for _, row in group.iterrows():
                    bndbox = ast.literal_eval(row['LABEL_COORDINATE'])
                    label = row['LABEL_VALUE']
                    obj = {
                        'name': label,
                        'xmin': bndbox[0],
                        'ymin': bndbox[1],
                        'xmax': bndbox[2],
                        'ymax': bndbox[3]
                    }
                    objects.append(obj)

                width, height, depth = 0, 0, 0

                folder = os.path.join(row['DEST_BUCKET'], row['DEST_PATH'])
                path = os.path.join(folder, filename)
#                real_path = os.path.join(test_root, path)
                with PIL.Image.open(local_tif_file_path_dict.get(filename)) as img:
                    width, height = img.size
                    depth = len(img.getbands())
                tree = create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects)
                pretty_xml = prettify_xml(tree.getroot())

                local_output_xml_file_path = os.path.join(local_output_file_dir, os.path.splitext(filename)[0] + '.xml')
                with open(local_output_xml_file_path, 'w', encoding='utf-8') as f:
                    f.write(pretty_xml)
                fname = replacetype(filename,'1')
                copy_local_to_obs(local_output_xml_file_path,os.path.join(args.output_obs_path, fname))
                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))
            elif operate_type == 'image_classification':
                for _, row in group.iterrows():
                    logger.info(f'-------------Cycle row--')

                lv = get_value(row, 'LABEL_VALUE')
                txtstr = create_pascal_voc_txt(lv)
                fname = replacetype(filename,'2')
                local_output_txt_file_path = os.path.join(local_output_file_dir, fname)
                with open(local_output_txt_file_path, 'w', encoding='utf-8') as f:
                    f.write(txtstr)
                copy_local_to_obs(local_output_txt_file_path,
                                  os.path.join(args.output_obs_path, fname))
                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))




def replacetype(filename,type):
    if type == '1':
        filename = filename.replace('.jpg', '.xml')
        filename = filename.replace('.bmp', '.xml')
        filename = filename.replace('.jpeg', '.xml')
        filename = filename.replace('.png', '.xml')
        filename = filename.replace('.JPG', '.xml')
        filename = filename.replace('.BMP', '.xml')
        filename = filename.replace('.JPEG', '.xml')
        filename = filename.replace('.PNG', '.xml')
    elif type == '2':
        filename = filename.replace('.jpg', '.txt')
        filename = filename.replace('.bmp', '.txt')
        filename = filename.replace('.jpeg', '.txt')
        filename = filename.replace('.png', '.txt')
        filename = filename.replace('.JPG', '.txt')
        filename = filename.replace('.BMP', '.txt')
        filename = filename.replace('.JPEG', '.txt')
        filename = filename.replace('.PNG', '.txt')
    return filename


def create_pascal_voc_txt(lv):
    # tree='DEST_ENGINE_UUID:'+deu+'\n'+'DEST_BUCKET:'+db
#    txttree = 'DEST_ENGINE_UUID:' + deu + '\n' + 'DEST_BUCKET:' + db + '\n' + 'DEST_PATH:' + dp + '\n' + 'LABEL_SET:' + ls + '\n' + 'LABEL_VALUE:' + lv + '\n' + 'LABEL_SHAPE:' + lse + '\n' + 'LABEL_COORDINATE:' + lc + '\n' + 'S#OURCE_REC_CREATE_TIME:' + srct + '\n' + 'SOURCE_REC_REVISE_TIME:' + srrt + '\n' + 'SOURCE_REC_CREATOR:' + src + '\n' + 'SOURCE_REC_REVISOR:' + srr + '\n' + 'REC_CREATE_TIME:' + rct + '\n' + 'REC_REVISE_TIME:' + rrt + '\n' + 'RE#C_CREATOR:' + rc + '\n' + 'REC_REVISOR:' + rr
    txttree = lv

    return txttree


class Process:
    def __init__(self, args):
        # Create a local directory to store files transferred from OBS.
        self.local_input_file_dir = r'/tmp/obs_input/'
        os.makedirs(self.local_input_file_dir, exist_ok=True)
        # Create a local directory to store the processed files and transfer them to OBS.
        self.local_output_file_dir = r'/tmp/obs_output/'
        os.makedirs(self.local_output_file_dir, exist_ok=True)
        # Set parameters. Set default values for operator parameters.
        self.args = args
        self.input_obs_path=args.obs_input_path
        self.output_obs_path=args.obs_output_path
        self.args.input_obs_path=args.obs_input_path
        self.args.output_obs_path=args.obs_output_path
        logger.info(f'-------Operator execution--------')

    def __call__(self, input):
        # Obtain the file name in the OBS path.
        file_name_list = mox.file.list_directory(self.args.input_obs_path, recursive=False)
        file_path_dict = {file_name: os.path.join(self.args.input_obs_path, file_name) for file_name in file_name_list}
        # Process each file.
        local_tif_file_path_dict, local_csv_file_path = {}, ''
        for file_name, obs_file_path in file_path_dict.items():
        # Copy the file from OBS to the local host.

#            if file_name.endswith('.jpg'):
#                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
#                logger.info(f'---------------4'+local_tif_file_path)
#                local_tif_file_path_dict[file_name] = local_tif_file_path
#                mox.file.copy(obs_file_path, local_tif_file_path)
#            else:
#                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
#                logger.info(f'---------------5'+local_csv_file_path)
#                mox.file.copy(obs_file_path, local_csv_file_path)

            if file_name.endswith('.csv'):

                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
                mox.file.copy(obs_file_path, local_csv_file_path)
            else:
                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
                local_tif_file_path_dict[file_name] = local_tif_file_path
                mox.file.copy(obs_file_path, local_tif_file_path)

        csv_to_pascal_voc(self.args, local_csv_file_path, local_tif_file_path_dict, self.local_output_file_dir)