Updated on 2025-11-20 GMT+08:00

Conversion Operators

The conversion operator construction requires a YML configuration file and the Python script TAR package for implementing functions.

1. YML file example:

### Python operator exampleid: FormatConvert2name: Custom operator format conversiondescription: Format conversion using custom operatorsauthor: Huawei Cloud Technology Co., Ltd.tags:  language:    - zh    - en  format:    - CSV    - TIF  category: Data conversion  modal:    - IMAGEruntime:  cpu-arch:    - X86  resources:    - cpu: 1      memory: 4096  environment: python  entrypoint: process.py  auto-data-loading: false # Whether to automatically load dataarguments:  - key: operateType  name: Operation type  type: ENUM  #Single choice    items:      - name: Object detection        value: object_detection      - name: Image classification        value: image_classification    required: true    visible: true    default: object_detection	    """    return "parquet"

2. TAR package:

Assume that the operator package name is operator-package.tar. The directory structure after the operator package is decompressed is as follows:

+---operator-package

| +--- program_package # Python operator directory

| | +---dependency

| | | +---requirements.txt # (Optional) Requirement description

| | +--- install.sh # (Optional) Installation script

| | +--- process.py # (Mandatory) Operator code

process.py file for format conversion

import PIL.Imageimport PIL.ImageOpsimport numpy as npimport astimport osimport pandas as pdimport xml.etree.ElementTree as ETfrom xml.dom import minidomimport PILimport moxing as moximport ma_utils as utilslogger = utils.FileLogger.get_logger()import argparsearg_parser = argparse.ArgumentParser()arg_parser.add_argument("--operateType", type=str, required=True, help="")arg_parser.add_argument("--input_obs_path", type=str, required=True, help="")arg_parser.add_argument("--output_obs_path", type=str, required=True, help="")def prettify_xml(elem):    rough_string = ET.tostring(elem, 'utf-8')    reparsed = minidom.parseString(rough_string)    return reparsed.toprettyxml(indent="  ")def create_pascal_voc_xml(folder, filename, path, width, height, depth, objects, deu, db, dp, ls, lv, lse, lc, srct,                          srrt, src, srr, rct, rrt, rc, rr, dest__engine_uuid=None):    annotation = ET.Element('annotation')    # UUID of the destination engine    dest__engine_uuid = ET.SubElement(annotation, 'DEST_ENGINE_UUID')    dest__engine_uuid.text = deu    # Destination bucket (logical partition)    dest__engine_uuid = ET.SubElement(annotation, 'DEST_BUCKET')    dest__engine_uuid.text = db    # Destination path    dest_path = ET.SubElement(annotation, 'DEST_PATH')    dest_path.text = folder    # File name    filename_element = ET.SubElement(annotation, 'FILE_NAME')    filename_element.text = filename    # Full path of the label group    dest_path = ET.SubElement(annotation, 'DEST_PATH')    dest_path.text = dp    # Full path of the label group    label_value = ET.SubElement(annotation, 'LABEL_VALUE')    label_value.text = lv    # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour    label_shape = ET.SubElement(annotation, 'LABEL_SHAPE')    label_shape.text = ls    # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour    label_coordinate = ET.SubElement(annotation, 'LABEL_COORDINATE')    label_coordinate.text = lc    # Time when a source record is created    rec_time = ET.SubElement(annotation, 'SOURCE_REC_CREATE_TIME')    rec_time.text = srct    # Time when a source record is modified    rec_rev_time = ET.SubElement(annotation, 'SOURCE_REC_REVISE_TIME')    rec_rev_time.text = srrt    # Creator of the source record    rec_ctor = ET.SubElement(annotation, 'SOURCE_REC_CREATOR')    rec_ctor.text = src    # Modifier of the source record    rec_r_ctor = ET.SubElement(annotation, 'SOURCE_REC_REVISOR')    rec_r_ctor.text = srr    # Time when a source record is created    d_cd_time = ET.SubElement(annotation, 'REC_CREATE_TIME')    d_cd_time.text = rct    # Time when a record is modified    r_time = ET.SubElement(annotation, 'REC_REVISE_TIME')    r_time.text = rrt    # Record creator    rec_c = ET.SubElement(annotation, 'REC_CREATOR')    rec_c.text = rc    # Record modifier    recd_time = ET.SubElement(annotation, 'REC_REVISOR')    recd_time.text = rr    tree = ET.ElementTree(annotation)    return treedef create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects):    annotation = ET.Element('annotation')    folder_element = ET.SubElement(annotation, 'folder')    folder_element.text = folder    filename_element = ET.SubElement(annotation, 'filename')    filename_element.text = filename    path_element = ET.SubElement(annotation, 'path')    path_element.text = path    source = ET.SubElement(annotation, 'source')    database = ET.SubElement(source, 'database')    database.text = 'Unknown'    size = ET.SubElement(annotation, 'size')    width_element = ET.SubElement(size, 'width')    width_element.text = str(width)    height_element = ET.SubElement(size, 'height')    height_element.text = str(height)    depth_element = ET.SubElement(size, 'depth')    depth_element.text = str(depth)    segmented = ET.SubElement(annotation, 'segmented')    segmented.text = '0'    for obj in objects:        obj_element = ET.SubElement(annotation, 'object')        name_element = ET.SubElement(obj_element, 'name')        name_element.text = obj['name']        pose = ET.SubElement(obj_element, 'pose')        pose.text = 'Unspecified'        truncated = ET.SubElement(obj_element, 'truncated')        truncated.text = '0'        difficult = ET.SubElement(obj_element, 'difficult')        difficult.text = '0'        bndbox = ET.SubElement(obj_element, 'bndbox')        xmin = ET.SubElement(bndbox, 'xmin')        xmin.text = str(obj['xmin'])        ymin = ET.SubElement(bndbox, 'ymin')        ymin.text = str(obj['ymin'])        xmax = ET.SubElement(bndbox, 'xmax')        xmax.text = str(obj['xmax'])        ymax = ET.SubElement(bndbox, 'ymax')        ymax.text = str(obj['ymax'])    tree = ET.ElementTree(annotation)    return treedef get_value(row, name):    value = '' if pd.isna(row[name]) else str(row[name])    return valuedef copy_local_to_obs(local_file_path, obs_file_path):    mox.file.copy(local_file_path, obs_file_path)def csv_to_pascal_voc(args, csv_file, local_tif_file_path_dict, local_output_file_dir):    operate_type = getattr(args, 'operateType', 'object_detection')    logger.info(operate_type)    df = pd.read_csv(csv_file)    grouped = df.groupby('FILE_NAME')    for filename, group in grouped:        if local_tif_file_path_dict.get(filename, None):            if operate_type == 'object_detection':                objects = []                for _, row in group.iterrows():                    bndbox = ast.literal_eval(row['LABEL_COORDINATE'])                    label = row['LABEL_VALUE']                    obj = {                        'name': label,                        'xmin': bndbox[0],                        'ymin': bndbox[1],                        'xmax': bndbox[2],                        'ymax': bndbox[3]                    }                    objects.append(obj)                width, height, depth = 0, 0, 0                folder = os.path.join(row['DEST_BUCKET'], row['DEST_PATH'])                path = os.path.join(folder, filename)#                real_path = os.path.join(test_root, path)                with PIL.Image.open(local_tif_file_path_dict.get(filename)) as img:                    width, height = img.size                    depth = len(img.getbands())                tree = create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects)                pretty_xml = prettify_xml(tree.getroot())                local_output_xml_file_path = os.path.join(local_output_file_dir, os.path.splitext(filename)[0] + '.xml')                with open(local_output_xml_file_path, 'w', encoding='utf-8') as f:                    f.write(pretty_xml)                fname = replacetype(filename,'1')                copy_local_to_obs(local_output_xml_file_path,os.path.join(args.output_obs_path, fname))                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))            elif operate_type == 'image_classification':                for _, row in group.iterrows():                    logger.info(f'-------------Cycle row--')                lv = get_value(row, 'LABEL_VALUE')                txtstr = create_pascal_voc_txt(lv)                fname = replacetype(filename,'2')                local_output_txt_file_path = os.path.join(local_output_file_dir, fname)                with open(local_output_txt_file_path, 'w', encoding='utf-8') as f:                    f.write(txtstr)                copy_local_to_obs(local_output_txt_file_path,                                  os.path.join(args.output_obs_path, fname))                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))def replacetype(filename,type):    if type == '1':        filename = filename.replace('.jpg', '.xml')        filename = filename.replace('.bmp', '.xml')        filename = filename.replace('.jpeg', '.xml')        filename = filename.replace('.png', '.xml')        filename = filename.replace('.JPG', '.xml')        filename = filename.replace('.BMP', '.xml')        filename = filename.replace('.JPEG', '.xml')        filename = filename.replace('.PNG', '.xml')    elif type == '2':        filename = filename.replace('.jpg', '.txt')        filename = filename.replace('.bmp', '.txt')        filename = filename.replace('.jpeg', '.txt')        filename = filename.replace('.png', '.txt')        filename = filename.replace('.JPG', '.txt')        filename = filename.replace('.BMP', '.txt')        filename = filename.replace('.JPEG', '.txt')        filename = filename.replace('.PNG', '.txt')    return filenamedef create_pascal_voc_txt(lv):    # tree='DEST_ENGINE_UUID:'+deu+'\n'+'DEST_BUCKET:'+db#    txttree = 'DEST_ENGINE_UUID:' + deu + '\n' + 'DEST_BUCKET:' + db + '\n' + 'DEST_PATH:' + dp + '\n' + 'LABEL_SET:' + ls + '\n' + 'LABEL_VALUE:' + lv + '\n' + 'LABEL_SHAPE:' + lse + '\n' + 'LABEL_COORDINATE:' + lc + '\n' + 'S#OURCE_REC_CREATE_TIME:' + srct + '\n' + 'SOURCE_REC_REVISE_TIME:' + srrt + '\n' + 'SOURCE_REC_CREATOR:' + src + '\n' + 'SOURCE_REC_REVISOR:' + srr + '\n' + 'REC_CREATE_TIME:' + rct + '\n' + 'REC_REVISE_TIME:' + rrt + '\n' + 'RE#C_CREATOR:' + rc + '\n' + 'REC_REVISOR:' + rr    txttree = lv    return txttreeclass Process:    def __init__(self, args):        # Create a local directory to store files transferred from OBS.        self.local_input_file_dir = r'/tmp/obs_input/'        os.makedirs(self.local_input_file_dir, exist_ok=True)        # Create a local directory to store the processed files and transfer them to OBS.        self.local_output_file_dir = r'/tmp/obs_output/'        os.makedirs(self.local_output_file_dir, exist_ok=True)        # Set parameters. Set default values for operator parameters.        self.args = args        self.input_obs_path=args.obs_input_path        self.output_obs_path=args.obs_output_path        self.args.input_obs_path=args.obs_input_path        self.args.output_obs_path=args.obs_output_path        logger.info(f'-------Operator execution--------')    def __call__(self, input):        # Obtain the file name in the OBS path.        file_name_list = mox.file.list_directory(self.args.input_obs_path, recursive=False)        file_path_dict = {file_name: os.path.join(self.args.input_obs_path, file_name) for file_name in file_name_list}        # Process each file.        local_tif_file_path_dict, local_csv_file_path = {}, ''        for file_name, obs_file_path in file_path_dict.items():        # Copy the file from OBS to the local host.#            if file_name.endswith('.jpg'):#                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)#                logger.info(f'---------------4'+local_tif_file_path)#                local_tif_file_path_dict[file_name] = local_tif_file_path#                mox.file.copy(obs_file_path, local_tif_file_path)#            else:#                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)#                logger.info(f'---------------5'+local_csv_file_path)#                mox.file.copy(obs_file_path, local_csv_file_path)            if file_name.endswith('.csv'):                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)                mox.file.copy(obs_file_path, local_csv_file_path)            else:                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)                local_tif_file_path_dict[file_name] = local_tif_file_path                mox.file.copy(obs_file_path, local_tif_file_path)        csv_to_pascal_voc(self.args, local_csv_file_path, local_tif_file_path_dict, self.local_output_file_dir)