更新时间:2024-08-16 GMT+08:00
分享

编排Workflow

Workflow的编排主要在于每个节点的定义,您可以参考创建Workflow节点章节,按照自己的场景需求选择相应的代码示例模板进行修改。编排过程主要分为以下几个步骤。

  1. 梳理场景,了解预置Step的功能,确定最终的DAG结构。
  2. 单节点功能,如训练、推理等在ModelArts相应服务中调试通过。
  3. 根据节点功能选择相应的代码模板,进行内容的补充。
  4. 根据DAG结构编排节点,完成Workflow的编写。

导入Workflow Data包

在编写Workflow过程中,相关对象都通过Workflow包进行导入,梳理如下:

from modelarts import workflow as wf

Data包相关内容导入:

wf.data.DatasetTypeEnum
wf.data.Dataset
wf.data.DatasetVersionConfig
wf.data.DatasetPlaceholder
wf.data.ServiceInputPlaceholder
wf.data.ServiceData
wf.data.ServiceUpdatePlaceholder
wf.data.DataTypeEnum
wf.data.ModelData
wf.data.GalleryModel
wf.data.OBSPath
wf.data.OBSOutputConfig
wf.data.OBSPlaceholder
wf.data.SWRImage
wf.data.SWRImagePlaceholder
wf.data.Storage
wf.data.InputStorage
wf.data.OutputStorage
wf.data.LabelTask
wf.data.LabelTaskPlaceholder
wf.data.LabelTaskConfig
wf.data.LabelTaskTypeEnum
wf.data.MetricsConfig
wf.data.TripartiteServiceConfig
wf.data.DataConsumptionSelector

policy包相关内容导入:

wf.policy.Policy
wf.policy.Scene

steps包相关内容导入:

wf.steps.MetricInfo
wf.steps.Condition
wf.steps.ConditionTypeEnum
wf.steps.ConditionStep
wf.steps.LabelingStep
wf.steps.LabelingInput
wf.steps.LabelingOutput
wf.steps.LabelTaskProperties
wf.steps.ImportDataInfo
wf.steps.DataOriginTypeEnum
wf.steps.DatasetImportStep
wf.steps.DatasetImportInput
wf.steps.DatasetImportOutput
wf.steps.AnnotationFormatConfig
wf.steps.AnnotationFormatParameters
wf.steps.AnnotationFormatEnum
wf.steps.Label
wf.steps.ImportTypeEnum
wf.steps.LabelFormat
wf.steps.LabelTypeEnum
wf.steps.ReleaseDatasetStep
wf.steps.ReleaseDatasetInput
wf.steps.ReleaseDatasetOutput
wf.steps.CreateDatasetStep
wf.steps.CreateDatasetInput
wf.steps.CreateDatasetOutput
wf.steps.DatasetProperties
wf.steps.SchemaField
wf.steps.ImportConfig
wf.steps.JobStep
wf.steps.JobMetadata
wf.steps.JobSpec
wf.steps.JobResource
wf.steps.JobTypeEnum
wf.steps.JobEngine
wf.steps.JobInput
wf.steps.JobOutput
wf.steps.LogExportPath
wf.steps.MrsJobStep
wf.steps.MrsJobInput
wf.steps.MrsJobOutput
wf.steps.MrsJobAlgorithm
wf.steps.ModelStep
wf.steps.ModelInput
wf.steps.ModelOutput
wf.steps.ModelConfig
wf.steps.Template
wf.steps.TemplateInputs
wf.steps.ServiceStep
wf.steps.ServiceInput
wf.steps.ServiceOutput
wf.steps.ServiceConfig
wf.steps.StepPolicy

Workflow包相关内容导入:

wf.workflow
wf.Subgraph
wf.Placeholder
wf.PlaceholderType
wf.AlgorithmParameters
wf.BaseAlgorithm
wf.Algorithm
wf.AIGalleryAlgorithm
wf.resource
wf.SystemEnv
wf.add_whitelist_users
wf.delete_whitelist_users

编写工作流代码示例

以图像分类为例,阐述机器学习端到端场景的完整开发过程,主要包括数据标注、模型训练、服务部署等过程。您需要准备如下算法和数据集。

  • 准备一个图像分类算法(或者可以直接从AI Gallery搜索订阅一个“图像分类-ResNet_v1_50”算法)。
  • 准备一个图片类型的数据集,请参考准备数据集。可从AI Gallery直接下载(例如:8类常见生活垃圾图片数据集)。
from modelarts import workflow as wf

# 定义统一存储对象管理输出目录
output_storage = wf.data.OutputStorage(name="output_storage", description="输出目录统一配置")

# 创建标注任务
data = wf.data.DatasetPlaceholder(name="input_data")

label_step = wf.steps.LabelingStep(
    name="labeling",
    title="数据标注",
    properties=wf.steps.LabelTaskProperties(
        task_type=wf.data.LabelTaskTypeEnum.IMAGE_CLASSIFICATION,
        task_name=wf.Placeholder(name="task_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个只包含大小写字母、数字、下划线、中划线或者中文字符的名称。填写已有标注任务名称,则直接使用该标注任务;填写新标注任务名称,则自动创建新的标注任务")
    ),
    inputs=wf.steps.LabelingInput(name="labeling_input", data=data),
    outputs=wf.steps.LabelingOutput(name="labeling_output"),
)

# 对标注任务进行发布
release_step = wf.steps.ReleaseDatasetStep(
    name="release",
    title="数据集版本发布",
    inputs=wf.steps.ReleaseDatasetInput(name="input_data", data=label_step.outputs["labeling_output"].as_input()),
    outputs=wf.steps.ReleaseDatasetOutput(name="labeling_output", dataset_version_config=wf.data.DatasetVersionConfig(train_evaluate_sample_ratio="0.8")),
    depend_steps=[label_step]
)


# 创建训练作业
job_step = wf.steps.JobStep(
    name="training_job",
    title="图像分类训练",
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="***", # 订阅算法的ID,自行补充
        item_version_id="10.0.0", # 订阅算法的版本ID
        parameters=[
            wf.AlgorithmParameters(name="task_type", value="image_classification_v2"),
            wf.AlgorithmParameters(name="model_name", value="resnet_v1_50"),
            wf.AlgorithmParameters(name="do_train", value="True"),
            wf.AlgorithmParameters(name="do_eval_along_train", value="True"),
            wf.AlgorithmParameters(name="variable_update", value="horovod"),
            wf.AlgorithmParameters(name="learning_rate_strategy", value=wf.Placeholder(name="learning_rate_strategy", placeholder_type=wf.PlaceholderType.STR, default="0.002", description="训练的学习率策略(10:0.001,20:0.0001代表0-10个epoch学习率0.001,10-20epoch学习率0.0001),如果不指定epoch, 会根据验证精度情况自动调整学习率,并当精度没有明显提升时,训练停止")),
            wf.AlgorithmParameters(name="batch_size", value=wf.Placeholder(name="batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步训练的图片数量(单卡)")),
            wf.AlgorithmParameters(name="eval_batch_size", value=wf.Placeholder(name="eval_batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步验证的图片数量(单卡)")),
            wf.AlgorithmParameters(name="evaluate_every_n_epochs", value=wf.Placeholder(name="evaluate_every_n_epochs", placeholder_type=wf.PlaceholderType.FLOAT, default=1.0, description="每训练n个epoch做一次验证")),
            wf.AlgorithmParameters(name="save_model_secs", value=wf.Placeholder(name="save_model_secs", placeholder_type=wf.PlaceholderType.INT, default=60, description="保存模型的频率(单位:s)")),
            wf.AlgorithmParameters(name="save_summary_steps", value=wf.Placeholder(name="save_summary_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="保存summary的频率(单位:步)")),
            wf.AlgorithmParameters(name="log_every_n_steps", value=wf.Placeholder(name="log_every_n_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="打印日志的频率(单位:步)")),
            wf.AlgorithmParameters(name="do_data_cleaning", value=wf.Placeholder(name="do_data_cleaning", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否进行数据清洗, 数据格式异常会导致训练失败,建议开启,保证训练稳定性。数据量过大时,数据清洗可能耗时较久,可自行线下清洗(支持BMP.JPEG,PNG格式, RGB三通道)。建议用JPEG格式数据")),
            wf.AlgorithmParameters(name="use_fp16", value=wf.Placeholder(name="use_fp16", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用混合精度, 混合精度可以加速训练,但是可能会造成一点精度损失,如果对精度无极严格的要求,建议开启")),
            wf.AlgorithmParameters(name="xla_compile", value=wf.Placeholder(name="xla_compile", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否开启xla编译,加速训练,默认启用")),
            wf.AlgorithmParameters(name="data_format", value=wf.Placeholder(name="data_format", placeholder_type=wf.PlaceholderType.ENUM, default="NCHW", enum_list=["NCHW", "NHWC"], description="输入数据类型,NHWC表示channel在最后,NCHW表channel在最前,默认值NCHW(速度有提升)")),
            wf.AlgorithmParameters(name="best_model", value=wf.Placeholder(name="best_model", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否在训练过程中保存并使用精度最高的模型,而不是最新的模型。默认值True,保存最优模型。在一定误差范围内,最优模型会保存最新的高精度模型")),
            wf.AlgorithmParameters(name="jpeg_preprocess", value=wf.Placeholder(name="jpeg_preprocess", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用jpeg预处理加速算子(仅支持jpeg格式数据),可加速数据读取,提升性能,默认启用。如果数据格式不是jpeg格式,开启数据清洗功能即可使用"))
        ]
    ),
    inputs=[wf.steps.JobInput(name="data_url", data=release_step.outputs["labeling_output"].as_input())],
    outputs=[wf.steps.JobOutput(name="train_url",
                                obs_config=wf.data.OBSOutputConfig(obs_path=output_storage.join("/train_output/")))],
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="training_flavor",
                                     placeholder_type=wf.PlaceholderType.JSON,
                                     description="训练资源规格"
                                     )
        )
    ),
    depend_steps=[release_step]
)

model_name = wf.Placeholder(name="model_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个1至64位且只包含大小写字母、中文、数字、中划线或者下划线的名称。工作流第一次运行建议填写新的模型名称,后续运行会自动在该模型上新增版本")

# 模型注册
model_step = wf.steps.ModelStep(
    name="model_step",
    title="模型注册",
    inputs=[wf.steps.ModelInput(name="model_input",
                                data=job_step.outputs["train_url"].as_input())],
    outputs=[wf.steps.ModelOutput(name="model_output",
                                  model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow"))],
    depend_steps=[job_step]
)


# 服务部署
service_step = wf.steps.ServiceStep(
    name="service_step",
    title="服务部署",
    inputs=[wf.steps.ServiceInput(name="service_input",
                                  data=wf.data.ServiceInputPlaceholder(name="service_model", model_name=model_name))],
    outputs=[wf.steps.ServiceOutput(name="service_output")],
    depend_steps=[model_step]
)

# 构建工作流对象
workflow = wf.Workflow(name="image-classification-ResNeSt",
                       desc="this is a image classification workflow",
                       steps=[label_step, release_step, job_step, model_step, service_step],
                       storages=[output_storage]
                       )

在工作流编写完成后可自行进行发布等操作。

相关文档