更新时间:2025-07-02 GMT+08:00

转换算子说明

转换算子构建包括:①YML配置文件 ②实现功能的python脚本TAR包

①YML文件示例:

### python算子示例

id: FormatConvert2
name: 自定义算子格式转化
description: 自定义算子格式转化
author: Huawei Cloud Technology Co., Ltd.
tags:
  language:
    - zh
    - en
  format:
    - CSV
    - TIF
  category: 数据转换
  modal:
    - IMAGE
runtime:
  cpu-arch:
    - X86
  resources:
    - cpu: 1
      memory: 4096
  environment: python
  entrypoint: process.py
  auto-data-loading: false # 是否需要自动加载数据
arguments:
  - key: operateType
    name: 操作类型
    type: ENUM  #单选
    items:
      - name: 物体检测
        value: object_detection
      - name: 图像分类
        value: image_classification
    required: true
    visible: true
    default: object_detection	
    """
    return "parquet"

②TAR包:

假设算子包名称为operator-package.tar,算子包解压后目录结构:

+---operator-package

| +--- program_package # python算子目录

| | +---dependency

| | | +---requirements.txt #需求说明,可选

| | +--- install.sh # 安装脚本,可选

| | +--- process.py # 算子代码,必填

实现格式转换功能的process.py文件

import PIL.Image
import PIL.ImageOps
import numpy as np
import ast
import os
import pandas as pd
import xml.etree.ElementTree as ET
from xml.dom import minidom
import PIL
import moxing as mox
import ma_utils as utils

logger = utils.FileLogger.get_logger()

import argparse


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--operateType", type=str, required=True, help="")
arg_parser.add_argument("--input_obs_path", type=str, required=True, help="")
arg_parser.add_argument("--output_obs_path", type=str, required=True, help="")


def prettify_xml(elem):
    rough_string = ET.tostring(elem, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    return reparsed.toprettyxml(indent="  ")


def create_pascal_voc_xml(folder, filename, path, width, height, depth, objects, deu, db, dp, ls, lv, lse, lc, srct,
                          srrt, src, srr, rct, rrt, rc, rr, dest__engine_uuid=None):
    annotation = ET.Element('annotation')
    # 目的端引擎uuid
    dest__engine_uuid = ET.SubElement(annotation, 'DEST_ENGINE_UUID')
    dest__engine_uuid.text = deu
    # 目的端桶(逻辑分区)
    dest__engine_uuid = ET.SubElement(annotation, 'DEST_BUCKET')
    dest__engine_uuid.text = db
    # 目的端路径
    dest_path = ET.SubElement(annotation, 'DEST_PATH')
    dest_path.text = folder
    # 文件名称
    filename_element = ET.SubElement(annotation, 'FILE_NAME')
    filename_element.text = filename
    # 标签组全路径
    dest_path = ET.SubElement(annotation, 'DEST_PATH')
    dest_path.text = dp
    # 标签组全路径
    label_value = ET.SubElement(annotation, 'LABEL_VALUE')
    label_value.text = lv
    # 标签形状 点 / 线 / 矩形(两点) / 矩形(bbox) / 矩形(中心点) / 多边形 / 轮廓
    label_shape = ET.SubElement(annotation, 'LABEL_SHAPE')
    label_shape.text = ls
    # 标签形状 点 / 线 / 矩形(两点) / 矩形(bbox) / 矩形(中心点) / 多边形 / 轮廓
    label_coordinate = ET.SubElement(annotation, 'LABEL_COORDINATE')
    label_coordinate.text = lc
    # 源端记录创建时间
    rec_time = ET.SubElement(annotation, 'SOURCE_REC_CREATE_TIME')
    rec_time.text = srct
    # 源端记录修改时间
    rec_rev_time = ET.SubElement(annotation, 'SOURCE_REC_REVISE_TIME')
    rec_rev_time.text = srrt
    # 源端记录创建人
    rec_ctor = ET.SubElement(annotation, 'SOURCE_REC_CREATOR')
    rec_ctor.text = src
    # 源端记录修改人
    rec_r_ctor = ET.SubElement(annotation, 'SOURCE_REC_REVISOR')
    rec_r_ctor.text = srr
    # 源端记录创建时间
    d_cd_time = ET.SubElement(annotation, 'REC_CREATE_TIME')
    d_cd_time.text = rct
    # 记录修改时间
    r_time = ET.SubElement(annotation, 'REC_REVISE_TIME')
    r_time.text = rrt
    # 记录创建人
    rec_c = ET.SubElement(annotation, 'REC_CREATOR')
    rec_c.text = rc
    # 记录修改人
    recd_time = ET.SubElement(annotation, 'REC_REVISOR')
    recd_time.text = rr
    tree = ET.ElementTree(annotation)
    return tree
def create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects):
    annotation = ET.Element('annotation')

    folder_element = ET.SubElement(annotation, 'folder')
    folder_element.text = folder

    filename_element = ET.SubElement(annotation, 'filename')
    filename_element.text = filename

    path_element = ET.SubElement(annotation, 'path')
    path_element.text = path

    source = ET.SubElement(annotation, 'source')
    database = ET.SubElement(source, 'database')
    database.text = 'Unknown'

    size = ET.SubElement(annotation, 'size')
    width_element = ET.SubElement(size, 'width')
    width_element.text = str(width)
    height_element = ET.SubElement(size, 'height')
    height_element.text = str(height)
    depth_element = ET.SubElement(size, 'depth')
    depth_element.text = str(depth)

    segmented = ET.SubElement(annotation, 'segmented')
    segmented.text = '0'

    for obj in objects:
        obj_element = ET.SubElement(annotation, 'object')
        name_element = ET.SubElement(obj_element, 'name')
        name_element.text = obj['name']

        pose = ET.SubElement(obj_element, 'pose')
        pose.text = 'Unspecified'

        truncated = ET.SubElement(obj_element, 'truncated')
        truncated.text = '0'

        difficult = ET.SubElement(obj_element, 'difficult')
        difficult.text = '0'

        bndbox = ET.SubElement(obj_element, 'bndbox')
        xmin = ET.SubElement(bndbox, 'xmin')
        xmin.text = str(obj['xmin'])
        ymin = ET.SubElement(bndbox, 'ymin')
        ymin.text = str(obj['ymin'])
        xmax = ET.SubElement(bndbox, 'xmax')
        xmax.text = str(obj['xmax'])
        ymax = ET.SubElement(bndbox, 'ymax')
        ymax.text = str(obj['ymax'])

    tree = ET.ElementTree(annotation)
    return tree

def get_value(row, name):
    value = '' if pd.isna(row[name]) else str(row[name])
    return value


def copy_local_to_obs(local_file_path, obs_file_path):
    mox.file.copy(local_file_path, obs_file_path)


def csv_to_pascal_voc(args, csv_file, local_tif_file_path_dict, local_output_file_dir):
    operate_type = getattr(args, 'operateType', 'object_detection')
    logger.info(operate_type)
    df = pd.read_csv(csv_file)
    grouped = df.groupby('FILE_NAME')

    for filename, group in grouped:
        if local_tif_file_path_dict.get(filename, None):
            if operate_type == 'object_detection':

                objects = []
                for _, row in group.iterrows():
                    bndbox = ast.literal_eval(row['LABEL_COORDINATE'])
                    label = row['LABEL_VALUE']
                    obj = {
                        'name': label,
                        'xmin': bndbox[0],
                        'ymin': bndbox[1],
                        'xmax': bndbox[2],
                        'ymax': bndbox[3]
                    }
                    objects.append(obj)

                width, height, depth = 0, 0, 0

                folder = os.path.join(row['DEST_BUCKET'], row['DEST_PATH'])
                path = os.path.join(folder, filename)
#                real_path = os.path.join(test_root, path)
                with PIL.Image.open(local_tif_file_path_dict.get(filename)) as img:
                    width, height = img.size
                    depth = len(img.getbands())
                tree = create_pascal_voc_xml1(folder, filename, path, width, height, depth, objects)
                pretty_xml = prettify_xml(tree.getroot())

                local_output_xml_file_path = os.path.join(local_output_file_dir, os.path.splitext(filename)[0] + '.xml')
                with open(local_output_xml_file_path, 'w', encoding='utf-8') as f:
                    f.write(pretty_xml)
                fname = replacetype(filename,'1')
                copy_local_to_obs(local_output_xml_file_path,os.path.join(args.output_obs_path, fname))
                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))
            elif operate_type == 'image_classification':
                for _, row in group.iterrows():
                    logger.info(f'-------------循环row--')

                lv = get_value(row, 'LABEL_VALUE')
                txtstr = create_pascal_voc_txt(lv)
                fname = replacetype(filename,'2')
                local_output_txt_file_path = os.path.join(local_output_file_dir, fname)
                with open(local_output_txt_file_path, 'w', encoding='utf-8') as f:
                    f.write(txtstr)
                copy_local_to_obs(local_output_txt_file_path,
                                  os.path.join(args.output_obs_path, fname))
                copy_local_to_obs(local_tif_file_path_dict.get(filename), os.path.join(args.output_obs_path, filename))




def replacetype(filename,type):
    if type == '1':
        filename = filename.replace('.jpg', '.xml')
        filename = filename.replace('.bmp', '.xml')
        filename = filename.replace('.jpeg', '.xml')
        filename = filename.replace('.png', '.xml')
        filename = filename.replace('.JPG', '.xml')
        filename = filename.replace('.BMP', '.xml')
        filename = filename.replace('.JPEG', '.xml')
        filename = filename.replace('.PNG', '.xml')
    elif type == '2':
        filename = filename.replace('.jpg', '.txt')
        filename = filename.replace('.bmp', '.txt')
        filename = filename.replace('.jpeg', '.txt')
        filename = filename.replace('.png', '.txt')
        filename = filename.replace('.JPG', '.txt')
        filename = filename.replace('.BMP', '.txt')
        filename = filename.replace('.JPEG', '.txt')
        filename = filename.replace('.PNG', '.txt')
    return filename


def create_pascal_voc_txt(lv):
    # tree='DEST_ENGINE_UUID:'+deu+'\n'+'DEST_BUCKET:'+db
#    txttree = 'DEST_ENGINE_UUID:' + deu + '\n' + 'DEST_BUCKET:' + db + '\n' + 'DEST_PATH:' + dp + '\n' + 'LABEL_SET:' + ls + '\n' + 'LABEL_VALUE:' + lv + '\n' + 'LABEL_SHAPE:' + lse + '\n' + 'LABEL_COORDINATE:' + lc + '\n' + 'S#OURCE_REC_CREATE_TIME:' + srct + '\n' + 'SOURCE_REC_REVISE_TIME:' + srrt + '\n' + 'SOURCE_REC_CREATOR:' + src + '\n' + 'SOURCE_REC_REVISOR:' + srr + '\n' + 'REC_CREATE_TIME:' + rct + '\n' + 'REC_REVISE_TIME:' + rrt + '\n' + 'RE#C_CREATOR:' + rc + '\n' + 'REC_REVISOR:' + rr
    txttree = lv

    return txttree


class Process:
    def __init__(self, args):
        # 创建本地目录用以存储从obs传入的文件
        self.local_input_file_dir = r'/tmp/obs_input/'
        os.makedirs(self.local_input_file_dir, exist_ok=True)
        # 创建本地目录用以存储处理后的文件,从而传输到obs
        self.local_output_file_dir = r'/tmp/obs_output/'
        os.makedirs(self.local_output_file_dir, exist_ok=True)
        # 设置参数,针对算子控制参数,设置默认值
        self.args = args
        self.input_obs_path=args.obs_input_path
        self.output_obs_path=args.obs_output_path
        self.args.input_obs_path=args.obs_input_path
        self.args.output_obs_path=args.obs_output_path
        logger.info(f'-------算子执行--------')

    def __call__(self, input):
        # 获取obs路径下的文件名
        file_name_list = mox.file.list_directory(self.args.input_obs_path, recursive=False)
        file_path_dict = {file_name: os.path.join(self.args.input_obs_path, file_name) for file_name in file_name_list}
        # 对每个文件进行处理
        local_tif_file_path_dict, local_csv_file_path = {}, ''
        for file_name, obs_file_path in file_path_dict.items():
            # 将文件从obs复制到本地

#            if file_name.endswith('.jpg'):
#                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
#                logger.info(f'---------------4'+local_tif_file_path)
#                local_tif_file_path_dict[file_name] = local_tif_file_path
#                mox.file.copy(obs_file_path, local_tif_file_path)
#            else:
#                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
#                logger.info(f'---------------5'+local_csv_file_path)
#                mox.file.copy(obs_file_path, local_csv_file_path)

            if file_name.endswith('.csv'):

                local_csv_file_path = os.path.join(self.local_input_file_dir, file_name)
                mox.file.copy(obs_file_path, local_csv_file_path)
            else:
                local_tif_file_path = os.path.join(self.local_input_file_dir, file_name)
                local_tif_file_path_dict[file_name] = local_tif_file_path
                mox.file.copy(obs_file_path, local_tif_file_path)

        csv_to_pascal_voc(self.args, local_csv_file_path, local_tif_file_path_dict, self.local_output_file_dir)