编排Workflow
Workflow的编排主要在于每个节点的定义,您可以参考创建Workflow节点章节,按照自己的场景需求选择相应的代码示例模板进行修改。编排过程主要分为以下几个步骤。
- 梳理场景,了解预置Step的功能,确定最终的DAG结构。
- 单节点功能,如训练、推理等在ModelArts相应服务中调试通过。
- 根据节点功能选择相应的代码模板,进行内容的补充。
- 根据DAG结构编排节点,完成Workflow的编写。
导入Workflow Data包
在编写Workflow过程中,相关对象都通过Workflow包进行导入,梳理如下:
from modelarts import workflow as wf
Data包相关内容导入:
wf.data.DatasetTypeEnum wf.data.Dataset wf.data.DatasetVersionConfig wf.data.DatasetPlaceholder wf.data.ServiceInputPlaceholder wf.data.ServiceData wf.data.ServiceUpdatePlaceholder wf.data.DataTypeEnum wf.data.ModelData wf.data.GalleryModel wf.data.OBSPath wf.data.OBSOutputConfig wf.data.OBSPlaceholder wf.data.SWRImage wf.data.SWRImagePlaceholder wf.data.Storage wf.data.InputStorage wf.data.OutputStorage wf.data.LabelTask wf.data.LabelTaskPlaceholder wf.data.LabelTaskConfig wf.data.LabelTaskTypeEnum wf.data.MetricsConfig wf.data.TripartiteServiceConfig wf.data.DataConsumptionSelector
policy包相关内容导入:
wf.policy.Policy wf.policy.Scene
steps包相关内容导入:
wf.steps.MetricInfo wf.steps.Condition wf.steps.ConditionTypeEnum wf.steps.ConditionStep wf.steps.LabelingStep wf.steps.LabelingInput wf.steps.LabelingOutput wf.steps.LabelTaskProperties wf.steps.ImportDataInfo wf.steps.DataOriginTypeEnum wf.steps.DatasetImportStep wf.steps.DatasetImportInput wf.steps.DatasetImportOutput wf.steps.AnnotationFormatConfig wf.steps.AnnotationFormatParameters wf.steps.AnnotationFormatEnum wf.steps.Label wf.steps.ImportTypeEnum wf.steps.LabelFormat wf.steps.LabelTypeEnum wf.steps.ReleaseDatasetStep wf.steps.ReleaseDatasetInput wf.steps.ReleaseDatasetOutput wf.steps.CreateDatasetStep wf.steps.CreateDatasetInput wf.steps.CreateDatasetOutput wf.steps.DatasetProperties wf.steps.SchemaField wf.steps.ImportConfig wf.steps.JobStep wf.steps.JobMetadata wf.steps.JobSpec wf.steps.JobResource wf.steps.JobTypeEnum wf.steps.JobEngine wf.steps.JobInput wf.steps.JobOutput wf.steps.LogExportPath wf.steps.MrsJobStep wf.steps.MrsJobInput wf.steps.MrsJobOutput wf.steps.MrsJobAlgorithm wf.steps.ModelStep wf.steps.ModelInput wf.steps.ModelOutput wf.steps.ModelConfig wf.steps.Template wf.steps.TemplateInputs wf.steps.ServiceStep wf.steps.ServiceInput wf.steps.ServiceOutput wf.steps.ServiceConfig wf.steps.StepPolicy
Workflow包相关内容导入:
wf.workflow wf.Subgraph wf.Placeholder wf.PlaceholderType wf.AlgorithmParameters wf.BaseAlgorithm wf.Algorithm wf.AIGalleryAlgorithm wf.resource wf.SystemEnv wf.add_whitelist_users wf.delete_whitelist_users
编写工作流代码示例
以图像分类为例,阐述机器学习端到端场景的完整开发过程,主要包括数据标注、模型训练、服务部署等过程。您需要准备如下算法和数据集。
- 准备一个图像分类算法(或者可以直接从AI Gallery搜索订阅一个“图像分类-ResNet_v1_50”算法)。
- 准备一个图片类型的数据集,请参考准备数据集。可从AI Gallery直接下载(例如:8类常见生活垃圾图片数据集)。
from modelarts import workflow as wf # 定义统一存储对象管理输出目录 output_storage = wf.data.OutputStorage(name="output_storage", description="输出目录统一配置") # 创建标注任务 data = wf.data.DatasetPlaceholder(name="input_data") label_step = wf.steps.LabelingStep( name="labeling", title="数据标注", properties=wf.steps.LabelTaskProperties( task_type=wf.data.LabelTaskTypeEnum.IMAGE_CLASSIFICATION, task_name=wf.Placeholder(name="task_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个只包含大小写字母、数字、下划线、中划线或者中文字符的名称。填写已有标注任务名称,则直接使用该标注任务;填写新标注任务名称,则自动创建新的标注任务") ), inputs=wf.steps.LabelingInput(name="labeling_input", data=data), outputs=wf.steps.LabelingOutput(name="labeling_output"), ) # 对标注任务进行发布 release_step = wf.steps.ReleaseDatasetStep( name="release", title="数据集版本发布", inputs=wf.steps.ReleaseDatasetInput(name="input_data", data=label_step.outputs["labeling_output"].as_input()), outputs=wf.steps.ReleaseDatasetOutput(name="labeling_output", dataset_version_config=wf.data.DatasetVersionConfig(train_evaluate_sample_ratio="0.8")), depend_steps=[label_step] ) # 创建训练作业 job_step = wf.steps.JobStep( name="training_job", title="图像分类训练", algorithm=wf.AIGalleryAlgorithm( subscription_id="***", # 订阅算法的ID,自行补充 item_version_id="10.0.0", # 订阅算法的版本ID parameters=[ wf.AlgorithmParameters(name="task_type", value="image_classification_v2"), wf.AlgorithmParameters(name="model_name", value="resnet_v1_50"), wf.AlgorithmParameters(name="do_train", value="True"), wf.AlgorithmParameters(name="do_eval_along_train", value="True"), wf.AlgorithmParameters(name="variable_update", value="horovod"), wf.AlgorithmParameters(name="learning_rate_strategy", value=wf.Placeholder(name="learning_rate_strategy", placeholder_type=wf.PlaceholderType.STR, default="0.002", description="训练的学习率策略(10:0.001,20:0.0001代表0-10个epoch学习率0.001,10-20epoch学习率0.0001),如果不指定epoch, 会根据验证精度情况自动调整学习率,并当精度没有明显提升时,训练停止")), wf.AlgorithmParameters(name="batch_size", value=wf.Placeholder(name="batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步训练的图片数量(单卡)")), wf.AlgorithmParameters(name="eval_batch_size", value=wf.Placeholder(name="eval_batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步验证的图片数量(单卡)")), wf.AlgorithmParameters(name="evaluate_every_n_epochs", value=wf.Placeholder(name="evaluate_every_n_epochs", placeholder_type=wf.PlaceholderType.FLOAT, default=1.0, description="每训练n个epoch做一次验证")), wf.AlgorithmParameters(name="save_model_secs", value=wf.Placeholder(name="save_model_secs", placeholder_type=wf.PlaceholderType.INT, default=60, description="保存模型的频率(单位:s)")), wf.AlgorithmParameters(name="save_summary_steps", value=wf.Placeholder(name="save_summary_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="保存summary的频率(单位:步)")), wf.AlgorithmParameters(name="log_every_n_steps", value=wf.Placeholder(name="log_every_n_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="打印日志的频率(单位:步)")), wf.AlgorithmParameters(name="do_data_cleaning", value=wf.Placeholder(name="do_data_cleaning", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否进行数据清洗, 数据格式异常会导致训练失败,建议开启,保证训练稳定性。数据量过大时,数据清洗可能耗时较久,可自行线下清洗(支持BMP.JPEG,PNG格式, RGB三通道)。建议用JPEG格式数据")), wf.AlgorithmParameters(name="use_fp16", value=wf.Placeholder(name="use_fp16", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用混合精度, 混合精度可以加速训练,但是可能会造成一点精度损失,如果对精度无极严格的要求,建议开启")), wf.AlgorithmParameters(name="xla_compile", value=wf.Placeholder(name="xla_compile", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否开启xla编译,加速训练,默认启用")), wf.AlgorithmParameters(name="data_format", value=wf.Placeholder(name="data_format", placeholder_type=wf.PlaceholderType.ENUM, default="NCHW", enum_list=["NCHW", "NHWC"], description="输入数据类型,NHWC表示channel在最后,NCHW表channel在最前,默认值NCHW(速度有提升)")), wf.AlgorithmParameters(name="best_model", value=wf.Placeholder(name="best_model", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否在训练过程中保存并使用精度最高的模型,而不是最新的模型。默认值True,保存最优模型。在一定误差范围内,最优模型会保存最新的高精度模型")), wf.AlgorithmParameters(name="jpeg_preprocess", value=wf.Placeholder(name="jpeg_preprocess", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用jpeg预处理加速算子(仅支持jpeg格式数据),可加速数据读取,提升性能,默认启用。如果数据格式不是jpeg格式,开启数据清洗功能即可使用")) ] ), inputs=[wf.steps.JobInput(name="data_url", data=release_step.outputs["labeling_output"].as_input())], outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=output_storage.join("/train_output/")))], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="training_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格" ) ) ), depend_steps=[release_step] ) model_name = wf.Placeholder(name="model_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个1至64位且只包含大小写字母、中文、数字、中划线或者下划线的名称。工作流第一次运行建议填写新的模型名称,后续运行会自动在该模型上新增版本") # 模型注册 model_step = wf.steps.ModelStep( name="model_step", title="模型注册", inputs=[wf.steps.ModelInput(name="model_input", data=job_step.outputs["train_url"].as_input())], outputs=[wf.steps.ModelOutput(name="model_output", model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow"))], depend_steps=[job_step] ) # 服务部署 service_step = wf.steps.ServiceStep( name="service_step", title="服务部署", inputs=[wf.steps.ServiceInput(name="service_input", data=wf.data.ServiceInputPlaceholder(name="service_model", model_name=model_name))], outputs=[wf.steps.ServiceOutput(name="service_output")], depend_steps=[model_step] ) # 构建工作流对象 workflow = wf.Workflow(name="image-classification-ResNeSt", desc="this is a image classification workflow", steps=[label_step, release_step, job_step, model_step, service_step], storages=[output_storage] )
在工作流编写完成后可自行进行发布等操作。