更新时间:2024-08-16 GMT+08:00
分享

在Workflow中更新已部署的服务

场景介绍

大部分场景下的工作流都是第一次运行部署新服务,后续进行模型迭代时,需要对已部署的服务进行更新。因此需要在同一条工作流中,同时支持服务的部署及更新能力。

编写工作流

基于编写工作流代码示例的场景案例进行改造,代码编写示例如下:

from modelarts import workflow as wf

# 定义统一存储对象管理输出目录
output_storage = wf.data.OutputStorage(name="output_storage", description="输出目录统一配置")

# 创建标注任务
data = wf.data.DatasetPlaceholder(name="input_data")

label_step = wf.steps.LabelingStep(
    name="labeling",
    title="数据标注",
    properties=wf.steps.LabelTaskProperties(
        task_type=wf.data.LabelTaskTypeEnum.IMAGE_CLASSIFICATION,
        task_name=wf.Placeholder(name="task_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个只包含大小写字母、数字、下划线、中划线或者中文字符的名称。填写已有标注任务名称,则直接使用该标注任务;填写新标注任务名称,则自动创建新的标注任务")
    ),
    inputs=wf.steps.LabelingInput(name="labeling_input", data=data),
    outputs=wf.steps.LabelingOutput(name="labeling_output"),
)

# 对标注任务进行发布
release_step = wf.steps.ReleaseDatasetStep(
    name="release",
    title="数据集版本发布",
    inputs=wf.steps.ReleaseDatasetInput(name="input_data", data=label_step.outputs["labeling_output"].as_input()),
    outputs=wf.steps.ReleaseDatasetOutput(name="labeling_output", dataset_version_config=wf.data.DatasetVersionConfig(train_evaluate_sample_ratio="0.8")),
    depend_steps=[label_step]
)


# 创建训练作业
job_step = wf.steps.JobStep(
    name="training_job",
    title="图像分类训练",
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="***", # 订阅算法的ID,自行补充
        item_version_id="10.0.0", # 订阅算法的版本ID
        parameters=[
            wf.AlgorithmParameters(name="task_type", value="image_classification_v2"),
            wf.AlgorithmParameters(name="model_name", value="resnet_v1_50"),
            wf.AlgorithmParameters(name="do_train", value="True"),
            wf.AlgorithmParameters(name="do_eval_along_train", value="True"),
            wf.AlgorithmParameters(name="variable_update", value="horovod"),
            wf.AlgorithmParameters(name="learning_rate_strategy", value=wf.Placeholder(name="learning_rate_strategy", placeholder_type=wf.PlaceholderType.STR, default="0.002", description="训练的学习率策略(10:0.001,20:0.0001代表0-10个epoch学习率0.001,10-20epoch学习率0.0001),如果不指定epoch, 会根据验证精度情况自动调整学习率,并当精度没有明显提升时,训练停止")),
            wf.AlgorithmParameters(name="batch_size", value=wf.Placeholder(name="batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步训练的图片数量(单卡)")),
            wf.AlgorithmParameters(name="eval_batch_size", value=wf.Placeholder(name="eval_batch_size", placeholder_type=wf.PlaceholderType.INT, default=64, description="每步验证的图片数量(单卡)")),
            wf.AlgorithmParameters(name="evaluate_every_n_epochs", value=wf.Placeholder(name="evaluate_every_n_epochs", placeholder_type=wf.PlaceholderType.FLOAT, default=1.0, description="每训练n个epoch做一次验证")),
            wf.AlgorithmParameters(name="save_model_secs", value=wf.Placeholder(name="save_model_secs", placeholder_type=wf.PlaceholderType.INT, default=60, description="保存模型的频率(单位:s)")),
            wf.AlgorithmParameters(name="save_summary_steps", value=wf.Placeholder(name="save_summary_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="保存summary的频率(单位:步)")),
            wf.AlgorithmParameters(name="log_every_n_steps", value=wf.Placeholder(name="log_every_n_steps", placeholder_type=wf.PlaceholderType.INT, default=10, description="打印日志的频率(单位:步)")),
            wf.AlgorithmParameters(name="do_data_cleaning", value=wf.Placeholder(name="do_data_cleaning", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否进行数据清洗, 数据格式异常会导致训练失败,建议开启,保证训练稳定性。数据量过大时,数据清洗可能耗时较久,可自行线下清洗(支持BMP.JPEG,PNG格式, RGB三通道)。建议用JPEG格式数据")),
            wf.AlgorithmParameters(name="use_fp16", value=wf.Placeholder(name="use_fp16", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用混合精度, 混合精度可以加速训练,但是可能会造成一点精度损失,如果对精度无极严格的要求,建议开启")),
            wf.AlgorithmParameters(name="xla_compile", value=wf.Placeholder(name="xla_compile", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否开启xla编译,加速训练,默认启用")),
            wf.AlgorithmParameters(name="data_format", value=wf.Placeholder(name="data_format", placeholder_type=wf.PlaceholderType.ENUM, default="NCHW", enum_list=["NCHW", "NHWC"], description="输入数据类型,NHWC表示channel在最后,NCHW表channel在最前,默认值NCHW(速度有提升)")),
            wf.AlgorithmParameters(name="best_model", value=wf.Placeholder(name="best_model", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否在训练过程中保存并使用精度最高的模型,而不是最新的模型。默认值True,保存最优模型。在一定误差范围内,最优模型会保存最新的高精度模型")),
            wf.AlgorithmParameters(name="jpeg_preprocess", value=wf.Placeholder(name="jpeg_preprocess", placeholder_type=wf.PlaceholderType.STR, default="True", description="是否使用jpeg预处理加速算子(仅支持jpeg格式数据),可加速数据读取,提升性能,默认启用。如果数据格式不是jpeg格式,开启数据清洗功能即可使用"))
        ]
    ),
    inputs=[wf.steps.JobInput(name="data_url", data=release_step.outputs["labeling_output"].as_input())],
    outputs=[wf.steps.JobOutput(name="train_url",
                                obs_config=wf.data.OBSOutputConfig(obs_path=output_storage.join("/train_output/")))],
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="training_flavor",
                                     placeholder_type=wf.PlaceholderType.JSON,
                                     description="训练资源规格"
                                     )
        )
    ),
    depend_steps=[release_step]
)

model_name = wf.Placeholder(name="model_name", placeholder_type=wf.PlaceholderType.STR, description="请输入一个1至64位且只包含大小写字母、中文、数字、中划线或者下划线的名称。工作流第一次运行建议填写新的模型名称,后续运行会自动在该模型上新增版本")

# 模型注册
model_step = wf.steps.ModelStep(
    name="model_step",
    title="模型注册",
    inputs=[wf.steps.ModelInput(name="model_input",
                                data=job_step.outputs["train_url"].as_input())],
    outputs=[wf.steps.ModelOutput(name="model_output",
                                  model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow"))],
    depend_steps=[job_step]
)

# 定义服务对象
service = wf.data.ServiceUpdatePlaceholder(name="service_update_placeholder", delay=True)

# 服务部署
service_step = wf.steps.ServiceStep(
    name="service_step",
    title="服务部署",
    inputs=[
        wf.steps.ServiceInput(name="service_input", data=wf.data.ServiceInputPlaceholder(name="service_model", model_name=model_name)),
        wf.steps.ServiceInput(name="input_service", data=service)
    ],
    outputs=[wf.steps.ServiceOutput(name="service_output")],
    depend_steps=[model_step]
)

# 构建工作流对象
workflow = wf.Workflow(name="image-classification-ResNeSt",
                       desc="this is a image classification workflow",
                       steps=[label_step, release_step, job_step, model_step, service_step],
                       storages=[output_storage]
                       )

其中ServiceStep节点包含两个输入,一个是模型列表对象,另一个是在线服务对象,此时在运行态通过开关的方式来控制部署/更新服务,如下图所示:

在线服务开关默认关闭,节点走部署服务的流程;如果需要更新服务,则手动打开开关,选择相应的在线服务即可。

进行服务更新时,需要保证被更新的服务所使用的模型与配置的模型名称相同。

相关文档