Conversion Operators
The conversion operator construction requires a YML configuration file and the Python script TAR package for implementing functions.
1. YML file example:
### Python operator example id: FormatConvert2 name: Custom operator format conversion description: Format conversion using custom operators author: Huawei Cloud Technology Co., Ltd. tags: language: - zh - en format: - CSV - TIF category: Data conversion modal: - IMAGE runtime: cpu-arch: - X86 resources: - cpu: 1 memory: 4096 environment: python entrypoint: process.py auto-data-loading: false # Whether to automatically load data arguments: - key: operateType name: Operation type type: ENUM #Single choice items: - name: Object detection value: object_detection - name: Image classification value: image_classification required: true visible: true default: object_detection """ return "parquet"
2. TAR package:
Assume that the operator package name is operator-package.tar. The directory structure after the operator package is decompressed is as follows:
+---operator-package
| +--- program_package # Python operator directory
| | +---dependency
| | | +---requirements.txt # (Optional) Requirement description
| | +--- install.sh # (Optional) Installation script
| | +--- process.py # (Mandatory) Operator code
process.py file for format conversion
import PIL.Image import PIL.ImageOps import numpy as np import ast import os import pandas as pd import xml.etree.ElementTree as ET from xml.dom import minidom import PIL import moxing as mox import ma_utils as utils logger = utils.FileLogger.get_logger() import argparse arg_parser = argparse.ArgumentParser() arg_parser.add_argument("--operateType", type=str, required=True, help="") arg_parser.add_argument("--input_obs_path", type=str, required=True, help="") arg_parser.add_argument("--output_obs_path", type=str, required=True, help="") def prettify_xml(elem): rough_string = ET.tostring(elem, 'utf-8') reparsed = minidom.parseString(rough_string) return reparsed.toprettyxml(indent=" ") def create_pascal_voc_xml(folder, filename, path, width, height, depth, objects, deu, db, dp, ls, lv, lse, lc, srct, srrt, src, srr, rct, rrt, rc, rr, dest__engine_uuid=None): annotation = ET.Element('annotation') # UUID of the destination engine dest__engine_uuid = ET.SubElement(annotation, 'DEST_ENGINE_UUID') dest__engine_uuid.text = deu # Destination bucket (logical partition) dest__engine_uuid = ET.SubElement(annotation, 'DEST_BUCKET') dest__engine_uuid.text = db # Destination path dest_path = ET.SubElement(annotation, 'DEST_PATH') dest_path.text = folder # File name filename_element = ET.SubElement(annotation, 'FILE_NAME') filename_element.text = filename # Full path of the label group dest_path = ET.SubElement(annotation, 'DEST_PATH') dest_path.text = dp # Full path of the label group label_value = ET.SubElement(annotation, 'LABEL_VALUE') label_value.text = lv # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour label_shape = ET.SubElement(annotation, 'LABEL_SHAPE') label_shape.text = ls # Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour label_coordinate = ET.SubElement(annotation, 'LABEL_COORDINATE') label_coordinate.text = lc # Time when a source record is created rec_time = ET.SubElement(annotation, 'SOURCE_REC_CREATE_TIME') rec_time.text = srct # Time when a source record is modified rec_rev_time = ET.SubElement(annotation, 'SOURCE_REC_REVISE_TIME') rec_rev_time.text = srrt # Creator of the source record rec_ctor = ET.SubElement(annotation, 'SOURCE_REC_CREATOR') rec_ctor.text = src # Modifier of the source record rec_r_ctor = ET.SubElement(annotation, 'SOURCE_REC_REVISOR') rec_r_ctor.text = srr # Time when a source record is created d_cd_time = ET.SubElement(annotation, 'REC_CREATE_TIME') d_cd_time.text = rct # Time when a record is modified r_time = ET.SubElement(annotation, 'REC_REVISE_TIME') r_time.text = rrt # Record creator rec_c = ET.SubElement(annotation, 'REC_CREATOR') rec_c.text = rc # Record modifier recd_time = ET.SubElement(annotation, 'REC_REVISOR') recd_time.text = rr tree = ET.ElementTree(annotation) return tree def create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects): annotation = ET.Element('annotation') folder_element = ET.SubElement(annotation, 'folder') folder_element.text = folder filename_element = ET.SubElement(annotation, 'filename') filename_element.text = filename path_element = ET.SubElement(annotation, 'path') path_element.text = path source = ET.SubElement(annotation, 'source') database = ET.SubElement(source, 'database') database.text = 'Unknown' size = ET.SubElement(annotation, 'size') width_element = ET.SubElement(size, 'width') width_element.text = str(width) height_element = ET.SubElement(size, 'height') height_element.text = str(height) depth_element = ET.SubElement(size, 'depth') depth_element.text = str(depth) segmented = ET.SubElement(annotation, 'segmented') segmented.text = '0' for obj in objects: obj_element = ET.SubElement(annotation, 'object') name_element = ET.SubElement(obj_element, 'name') name_element.text = obj['name'] pose = ET.SubElement(obj_element, 'pose') pose.text = 'Unspecified' truncated = ET.SubElement(obj_element, 'truncated') truncated.text = '0' difficult = ET.SubElement(obj_element, 'difficult') difficult.text = '0' bndbox = ET.SubElement(obj_element, 'bndbox') xmin = ET.SubElement(bndbox, 'xmin') xmin.text = str(obj['xmin']) ymin = ET.SubElement(bndbox, 'ymin') ymin.text = str(obj['ymin']) xmax = ET.SubElement(bndbox, 'xmax') xmax.text = str(obj['xmax']) ymax = ET.SubElement(bndbox, 'ymax') ymax.text = str(obj['ymax']) tree = ET.ElementTree(annotation) return tree def get_value(row, name): value = '' if pd.isna(row[name]) else str(row[name]) return value def copy_local_to_obs(local_file_path, obs_file_path): mox.file.copy(local_file_path, obs_file_path) def csv_to_pascal_voc(args, csv_file, local_tif_file_path_dict, local_output_file_dir): operate_type = getattr(args, 'operateType', 'object_detection') logger.info(operate_type) df = pd.read_csv(csv_file) grouped = df.groupby('FILE_NAME') for filename, group in grouped: if local_tif_file_path_dict.get(filename, None): if operate_type == 'object_detection': objects = [] for _, row in group.iterrows(): bndbox = ast.literal_eval(row['LABEL_COORDINATE']) label = row['LABEL_VALUE'] obj = { 'name': label, 'xmin': bndbox[0], 'ymin': bndbox[1], 'xmax': bndbox[2], 'ymax': bndbox[3] } objects.append(obj) width, height, depth = 0, 0, 0 folder = os.path.join(row['DEST_BUCKET'], row['DEST_PATH']) path = os.path.join(folder, filename) # real_path = os.path.join(test_root, path) with PIL.Image.open(local_tif_file_path_dict.get(filename)) as img: width, height = img.size depth = len(img.getbands()) tree = create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects) pretty_xml = prettify_xml(tree.getroot()) local_output_xml_file_path = os.path.join(local_output_file_dir, os.path.splitext(filename)[0] + '.xml') with open(local_output_xml_file_path, 'w', encoding='utf-8') as f: f.write(pretty_xml) fname = replacetype(filename,'1') copy_local_to_obs(local_output_xml_file_path,os.path.join(args.output_obs_path, fname)) copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename)) elif operate_type == 'image_classification': for _, row in group.iterrows(): logger.info(f'-------------Cycle row--') lv = get_value(row, 'LABEL_VALUE') txtstr = create_pascal_voc_txt(lv) fname = replacetype(filename,'2') local_output_txt_file_path = os.path.join(local_output_file_dir, fname) with open(local_output_txt_file_path, 'w', encoding='utf-8') as f: f.write(txtstr) copy_local_to_obs(local_output_txt_file_path, os.path.join(args.output_obs_path, fname)) copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename)) def replacetype(filename,type): if type == '1': filename = filename.replace('.jpg', '.xml') filename = filename.replace('.bmp', '.xml') filename = filename.replace('.jpeg', '.xml') filename = filename.replace('.png', '.xml') filename = filename.replace('.JPG', '.xml') filename = filename.replace('.BMP', '.xml') filename = filename.replace('.JPEG', '.xml') filename = filename.replace('.PNG', '.xml') elif type == '2': filename = filename.replace('.jpg', '.txt') filename = filename.replace('.bmp', '.txt') filename = filename.replace('.jpeg', '.txt') filename = filename.replace('.png', '.txt') filename = filename.replace('.JPG', '.txt') filename = filename.replace('.BMP', '.txt') filename = filename.replace('.JPEG', '.txt') filename = filename.replace('.PNG', '.txt') return filename def create_pascal_voc_txt(lv): # tree='DEST_ENGINE_UUID:'+deu+'\n'+'DEST_BUCKET:'+db # txttree = 'DEST_ENGINE_UUID:' + deu + '\n' + 'DEST_BUCKET:' + db + '\n' + 'DEST_PATH:' + dp + '\n' + 'LABEL_SET:' + ls + '\n' + 'LABEL_VALUE:' + lv + '\n' + 'LABEL_SHAPE:' + lse + '\n' + 'LABEL_COORDINATE:' + lc + '\n' + 'S#OURCE_REC_CREATE_TIME:' + srct + '\n' + 'SOURCE_REC_REVISE_TIME:' + srrt + '\n' + 'SOURCE_REC_CREATOR:' + src + '\n' + 'SOURCE_REC_REVISOR:' + srr + '\n' + 'REC_CREATE_TIME:' + rct + '\n' + 'REC_REVISE_TIME:' + rrt + '\n' + 'RE#C_CREATOR:' + rc + '\n' + 'REC_REVISOR:' + rr txttree = lv return txttree class Process: def __init__(self, args): # Create a local directory to store files transferred from OBS. self.local_input_file_dir = r'/tmp/obs_input/' os.makedirs(self.local_input_file_dir, exist_ok=True) # Create a local directory to store the processed files and transfer them to OBS. self.local_output_file_dir = r'/tmp/obs_output/' os.makedirs(self.local_output_file_dir, exist_ok=True) # Set parameters. Set default values for operator parameters. self.args = args self.input_obs_path=args.obs_input_path self.output_obs_path=args.obs_output_path self.args.input_obs_path=args.obs_input_path self.args.output_obs_path=args.obs_output_path logger.info(f'-------Operator execution--------') def __call__(self, input): # Obtain the file name in the OBS path. file_name_list = mox.file.list_directory(self.args.input_obs_path, recursive=False) file_path_dict = {file_name: os.path.join(self.args.input_obs_path, file_name) for file_name in file_name_list} # Process each file. local_tif_file_path_dict, local_csv_file_path = {}, '' for file_name, obs_file_path in file_path_dict.items(): # Copy the file from OBS to the local host. # if file_name.endswith('.jpg'): # local_tif_file_path = os.path.join(self.local_input_file_dir, file_name) # logger.info(f'---------------4'+local_tif_file_path) # local_tif_file_path_dict[file_name] = local_tif_file_path # mox.file.copy(obs_file_path, local_tif_file_path) # else: # local_csv_file_path = os.path.join(self.local_input_file_dir, file_name) # logger.info(f'---------------5'+local_csv_file_path) # mox.file.copy(obs_file_path, local_csv_file_path) if file_name.endswith('.csv'): local_csv_file_path = os.path.join(self.local_input_file_dir, file_name) mox.file.copy(obs_file_path, local_csv_file_path) else: local_tif_file_path = os.path.join(self.local_input_file_dir, file_name) local_tif_file_path_dict[file_name] = local_tif_file_path mox.file.copy(obs_file_path, local_tif_file_path) csv_to_pascal_voc(self.args, local_csv_file_path, local_tif_file_path_dict, self.local_output_file_dir)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot