Conversion Operators
The conversion operator construction requires a YML configuration file and the Python script TAR package for implementing functions.
1. YML file example:
### Python operator example
id: FormatConvert2
name: Custom operator format conversion
description: Format conversion using custom operators
author: Huawei Cloud Technology Co., Ltd.
tags:
language:
- zh
- en
format:
- CSV
- TIF
category: Data conversion
modal:
- IMAGE
runtime:
cpu-arch:
- X86
resources:
- cpu: 1
memory: 4096
environment: python
entrypoint: process.py
auto-data-loading: false # Whether to automatically load data
arguments:
- key: operateType
name: Operation type
type: ENUM #Single choice
items:
- name: Object detection
value: object_detection
- name: Image classification
value: image_classification
required: true
visible: true
default: object_detection
"""
return "parquet"
2. TAR package:
Assume that the operator package name is operator-package.tar. The directory structure after the operator package is decompressed is as follows:
+---operator-package
| +--- program_package # Python operator directory
| | +---dependency
| | | +---requirements.txt # (Optional) Requirement description
| | +--- install.sh # (Optional) Installation script
| | +--- process.py # (Mandatory) Operator code
process.py file for format conversion
import PIL.Image
import PIL.ImageOps
import numpy as np
import ast
import os
import pandas as pd
import xml.etree.ElementTree as ET
from xml.dom import minidom
import PIL
import moxing as mox
import ma_utils as utils
logger = utils.FileLogger.get_logger()
import argparse
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--operateType", type=str, required=True, help="")
arg_parser.add_argument("--input_obs_path", type=str, required=True, help="")
arg_parser.add_argument("--output_obs_path", type=str, required=True, help="")
def prettify_xml(elem):
rough_string = ET.tostring(elem, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent=" ")
def create_pascal_voc_xml(folder, filename, path, width, height, depth, objects, deu, db, dp, ls, lv, lse, lc, srct,
srrt, src, srr, rct, rrt, rc, rr, dest__engine_uuid=None):
annotation = ET.Element('annotation')
# UUID of the destination engine
dest__engine_uuid = ET.SubElement(annotation, 'DEST_ENGINE_UUID')
dest__engine_uuid.text = deu
# Destination bucket (logical partition)
dest__engine_uuid = ET.SubElement(annotation, 'DEST_BUCKET')
dest__engine_uuid.text = db
# Destination path
dest_path = ET.SubElement(annotation, 'DEST_PATH')
dest_path.text = folder
# File name
filename_element = ET.SubElement(annotation, 'FILE_NAME')
filename_element.text = filename
# Full path of the label group
dest_path = ET.SubElement(annotation, 'DEST_PATH')
dest_path.text = dp
# Full path of the label group
label_value = ET.SubElement(annotation, 'LABEL_VALUE')
label_value.text = lv
# Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour
label_shape = ET.SubElement(annotation, 'LABEL_SHAPE')
label_shape.text = ls
# Label shape point/line/rectangle (two points) /rectangle (bbox) /rectangle (center point) /polygon/contour
label_coordinate = ET.SubElement(annotation, 'LABEL_COORDINATE')
label_coordinate.text = lc
# Time when a source record is created
rec_time = ET.SubElement(annotation, 'SOURCE_REC_CREATE_TIME')
rec_time.text = srct
# Time when a source record is modified
rec_rev_time = ET.SubElement(annotation, 'SOURCE_REC_REVISE_TIME')
rec_rev_time.text = srrt
# Creator of the source record
rec_ctor = ET.SubElement(annotation, 'SOURCE_REC_CREATOR')
rec_ctor.text = src
# Modifier of the source record
rec_r_ctor = ET.SubElement(annotation, 'SOURCE_REC_REVISOR')
rec_r_ctor.text = srr
# Time when a source record is created
d_cd_time = ET.SubElement(annotation, 'REC_CREATE_TIME')
d_cd_time.text = rct
# Time when a record is modified
r_time = ET.SubElement(annotation, 'REC_REVISE_TIME')
r_time.text = rrt
# Record creator
rec_c = ET.SubElement(annotation, 'REC_CREATOR')
rec_c.text = rc
# Record modifier
recd_time = ET.SubElement(annotation, 'REC_REVISOR')
recd_time.text = rr
tree = ET.ElementTree(annotation)
return tree
def create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects):
annotation = ET.Element('annotation')
folder_element = ET.SubElement(annotation, 'folder')
folder_element.text = folder
filename_element = ET.SubElement(annotation, 'filename')
filename_element.text = filename
path_element = ET.SubElement(annotation, 'path')
path_element.text = path
source = ET.SubElement(annotation, 'source')
database = ET.SubElement(source, 'database')
database.text = 'Unknown'
size = ET.SubElement(annotation, 'size')
width_element = ET.SubElement(size, 'width')
width_element.text = str(width)
height_element = ET.SubElement(size, 'height')
height_element.text = str(height)
depth_element = ET.SubElement(size, 'depth')
depth_element.text = str(depth)
segmented = ET.SubElement(annotation, 'segmented')
segmented.text = '0'
for obj in objects:
obj_element = ET.SubElement(annotation, 'object')
name_element = ET.SubElement(obj_element, 'name')
name_element.text = obj['name']
pose = ET.SubElement(obj_element, 'pose')
pose.text = 'Unspecified'
truncated = ET.SubElement(obj_element, 'truncated')
truncated.text = '0'
difficult = ET.SubElement(obj_element, 'difficult')
difficult.text = '0'
bndbox = ET.SubElement(obj_element, 'bndbox')
xmin = ET.SubElement(bndbox, 'xmin')
xmin.text = str(obj['xmin'])
ymin = ET.SubElement(bndbox, 'ymin')
ymin.text = str(obj['ymin'])
xmax = ET.SubElement(bndbox, 'xmax')
xmax.text = str(obj['xmax'])
ymax = ET.SubElement(bndbox, 'ymax')
ymax.text = str(obj['ymax'])
tree = ET.ElementTree(annotation)
return tree
def get_value(row, name):
value = '' if pd.isna(row[name]) else str(row[name])
return value
def copy_local_to_obs(local_file_path, obs_file_path):
mox.file.copy(local_file_path, obs_file_path)
def csv_to_pascal_voc(args, csv_file, local_tif_file_path_dict, local_output_file_dir):
operate_type = getattr(args, 'operateType', 'object_detection')
logger.info(operate_type)
df = pd.read_csv(csv_file)
grouped = df.groupby('FILE_NAME')
for filename, group in grouped:
if local_tif_file_path_dict.get(filename, None):
if operate_type == 'object_detection':
objects = []
for _, row in group.iterrows():
bndbox = ast.literal_eval(row['LABEL_COORDINATE'])
label = row['LABEL_VALUE']
obj = {
'name': label,
'xmin': bndbox[0],
'ymin': bndbox[1],
'xmax': bndbox[2],
'ymax': bndbox[3]
}
objects.append(obj)
width, height, depth = 0, 0, 0
folder = os.path.join(row['DEST_BUCKET'], row['DEST_PATH'])
path = os.path.join(folder, filename)
# real_path = os.path.join(test_root, path)
with PIL.Image.open(local_tif_file_path_dict.get(filename)) as img:
width, height = img.size
depth = len(img.getbands())
tree = create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects)
pretty_xml = prettify_xml(tree.getroot())
local_output_xml_file_path = os.path.join(local_output_file_dir, os.path.splitext(filename)[0] + '.xml')
with open(local_output_xml_file_path, 'w', encoding='utf-8') as f:
f.write(pretty_xml)
fname = replacetype(filename,'1')
copy_local_to_obs(local_output_xml_file_path,os.path.join(args.output_obs_path, fname))
copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))
elif operate_type == 'image_classification':
for _, row in group.iterrows():
logger.info(f'-------------Cycle row--')
lv = get_value(row, 'LABEL_VALUE')
txtstr = create_pascal_voc_txt(lv)
fname = replacetype(filename,'2')
local_output_txt_file_path = os.path.join(local_output_file_dir, fname)
with open(local_output_txt_file_path, 'w', encoding='utf-8') as f:
f.write(txtstr)
copy_local_to_obs(local_output_txt_file_path,
os.path.join(args.output_obs_path, fname))
copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))
def replacetype(filename,type):
if type == '1':
filename = filename.replace('.jpg', '.xml')
filename = filename.replace('.bmp', '.xml')
filename = filename.replace('.jpeg', '.xml')
filename = filename.replace('.png', '.xml')
filename = filename.replace('.JPG', '.xml')
filename = filename.replace('.BMP', '.xml')
filename = filename.replace('.JPEG', '.xml')
filename = filename.replace('.PNG', '.xml')
elif type == '2':
filename = filename.replace('.jpg', '.txt')
filename = filename.replace('.bmp', '.txt')
filename = filename.replace('.jpeg', '.txt')
filename = filename.replace('.png', '.txt')
filename = filename.replace('.JPG', '.txt')
filename = filename.replace('.BMP', '.txt')
filename = filename.replace('.JPEG', '.txt')
filename = filename.replace('.PNG', '.txt')
return filename
def create_pascal_voc_txt(lv):
# tree='DEST_ENGINE_UUID:'+deu+'\n'+'DEST_BUCKET:'+db
# txttree = 'DEST_ENGINE_UUID:' + deu + '\n' + 'DEST_BUCKET:' + db + '\n' + 'DEST_PATH:' + dp + '\n' + 'LABEL_SET:' + ls + '\n' + 'LABEL_VALUE:' + lv + '\n' + 'LABEL_SHAPE:' + lse + '\n' + 'LABEL_COORDINATE:' + lc + '\n' + 'S#OURCE_REC_CREATE_TIME:' + srct + '\n' + 'SOURCE_REC_REVISE_TIME:' + srrt + '\n' + 'SOURCE_REC_CREATOR:' + src + '\n' + 'SOURCE_REC_REVISOR:' + srr + '\n' + 'REC_CREATE_TIME:' + rct + '\n' + 'REC_REVISE_TIME:' + rrt + '\n' + 'RE#C_CREATOR:' + rc + '\n' + 'REC_REVISOR:' + rr
txttree = lv
return txttree
class Process:
def __init__(self, args):
# Create a local directory to store files transferred from OBS.
self.local_input_file_dir = r'/tmp/obs_input/'
os.makedirs(self.local_input_file_dir, exist_ok=True)
# Create a local directory to store the processed files and transfer them to OBS.
self.local_output_file_dir = r'/tmp/obs_output/'
os.makedirs(self.local_output_file_dir, exist_ok=True)
# Set parameters. Set default values for operator parameters.
self.args = args
self.input_obs_path=args.obs_input_path
self.output_obs_path=args.obs_output_path
self.args.input_obs_path=args.obs_input_path
self.args.output_obs_path=args.obs_output_path
logger.info(f'-------Operator execution--------')
def __call__(self, input):
# Obtain the file name in the OBS path.
file_name_list = mox.file.list_directory(self.args.input_obs_path, recursive=False)
file_path_dict = {file_name: os.path.join(self.args.input_obs_path, file_name) for file_name in file_name_list}
# Process each file.
local_tif_file_path_dict, local_csv_file_path = {}, ''
for file_name, obs_file_path in file_path_dict.items():
# Copy the file from OBS to the local host.
# if file_name.endswith('.jpg'):
# local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
# logger.info(f'---------------4'+local_tif_file_path)
# local_tif_file_path_dict[file_name] = local_tif_file_path
# mox.file.copy(obs_file_path, local_tif_file_path)
# else:
# local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
# logger.info(f'---------------5'+local_csv_file_path)
# mox.file.copy(obs_file_path, local_csv_file_path)
if file_name.endswith('.csv'):
local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
mox.file.copy(obs_file_path, local_csv_file_path)
else:
local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
local_tif_file_path_dict[file_name] = local_tif_file_path
mox.file.copy(obs_file_path, local_tif_file_path)
csv_to_pascal_voc(self.args, local_csv_file_path, local_tif_file_path_dict, self.local_output_file_dir)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot