更新时间:2024-10-24 GMT+08:00

配置Workflow的输入输出目录

功能介绍

统一存储主要用于工作流的目录管理,帮助用户统一管理一个工作流中的所有存储路径,主要分为以下两个功能:

  • 输入目录管理:开发者在编辑开发工作流时可以对所有数据的存储路径做统一管理,规定用户按照自己的目录规划来存放数据,而存储的根目录可以根据用户自己的需求自行配置。该方式只做目录的编排,不会自动创建新的目录。
  • 输出目录管理:开发者在编辑开发工作流时可以对所有的输出路径做统一管理,用户无需手动创建输出目录,只需要在工作流运行前配置存储根路径,并且可以根据开发者的目录编排规则在指定目录下查看输出的数据信息。此外同一个工作流的多次运行支持输出到不同的目录下,对不同的执行做了很好的数据隔离。

常用方式

  • InputStorage(路径拼接)

    该对象主要用于帮助用户统一管理输入的目录,使用示例如下:

    import modelarts.workflow as wf
    storage = wf.data.InputStorage(name="storage_name", title="title_info", description="description_info") # name字段必填,title, description可选填
    input_data = wf.data.OBSPath(obs_path = storage.join("directory_path")) # 注意,如果是目录则最后需要加"/",例如:storage.join("/input/data/") 
    
    工作流运行时,如果storage对象配置的根路径为"/root/",则最后得到的路径为"/root/directory_path"
  • OutputStorage(目录创建)

    该对象主要用于帮助用户统一管理输出的目录,保证工作流每次执行输出到新目录,使用示例如下:

    import modelarts.workflow as wf
    storage = wf.data.OutputStorage(name="storage_name", title="title_info", description="description_info") # name字段必填,title, description可选填
    output_path = wf.data.OBSOutputConfig(obs_path = storage.join("directory_path")) # 注意,只能创建目录,不能创建文件 。
    
    工作流运行时,如果storage对象配置的根路径为"/root/",则系统自动创建相对目录,最后得到的路径为"/root/执行ID/directory_path"

进阶用法

Storage

该对象是InputStorage和OutputStorage的基类,包含了两者的所有能力,可以供用户灵活使用。

属性

描述

是否必填

数据类型

name

名称。

str

title

不填默认使用name的值。

str

description

描述信息。

str

create_dir

表示是否自动创建目录,默认为“False”。

bool

with_execution_id

表示创建目录时是否拼接execution_id,默认为“False”。该字段只有在create_dir为True时才支持设置为True。

bool

使用示例如下:

  • 实现InputStorage相同的能力
    import modelarts.workflow as wf 
    # 构建一个Storage对象, with_execution_id=False, create_dir=False
    storage = wf.data.Storage(name="storage_name", title="title_info", description="description_info", with_execution_id=False, create_dir=False) 
    input_data = wf.data.OBSPath(obs_path = storage.join("directory_path")) # 注意,如果是目录则最后需要加"/",例如:storage.join("/input/data/") 
    
    工作流运行时,如果storage对象配置的根路径为"/root/",则最后得到的路径为"/root/directory_path"
  • 实现OutputStorage相同的能力
    import modelarts.workflow as wf 
    # 构建一个Storage对象, with_execution_id=True, create_dir=True
    storage = wf.data.Storage(name="storage_name", title="title_info", description="description_info", with_execution_id=True, create_dir=True) 
    output_path = wf.data.OBSOutputConfig(obs_path = storage.join("directory_path")) # 注意,只能创建目录,不能创建文件 
    
    工作流运行时,如果storage对象配置的根路径为"/root/",则系统自动创建相对目录,最后得到的路径为"/root/执行ID/directory_path"
  • 通过join方法的参数实现同一个Storage的不同用法
    import modelarts.workflow as wf 
    # 构建一个Storage对象, 并且假设Storage配置的根目录为"/root/"
    storage = wf.data.Storage(name="storage_name", title="title_info", description="description_info", with_execution_id=False, create_dir=False) 
    input_data1 = wf.data.OBSPath(obs_path = storage) # 得到的路径为:/root/
    input_data2 = wf.data.OBSPath(obs_path = storage.join("directory_path")) # 得到的路径为:/root/directory_path,需要用户自行保证该路径存在
    output_path1 = wf.data.OBSOutputConfig(obs_path = storage.join(directory="directory_path", with_execution_id=False, create_dir=True)) # 系统自动创建目录,得到的路径为:/root/directory_path
    output_path2 = wf.data.OBSOutputConfig(obs_path = storage.join(directory="directory_path", with_execution_id=True, create_dir=True)) # 系统自动创建目录,得到的路径为:/root/执行ID/directory_path

Storage可实现链式调用

使用示例如下:
import modelarts.workflow as wf 
# 构建一个基类Storage对象, 并且假设Storage配置的根目录为"/root/"
storage = wf.data.Storage(name="storage_name", title="title_info", description="description_info", with_execution_id=False, create_dir=Fals)
input_storage = storage.join("directory_path_1") # 得到的路径为:/root/directory_path_1
input_storage_next = input_storage.join("directory_path_2") # 得到的路径为: /root/directory_path_1/directory_path_2

使用案例

统一存储主要用于JobStep中,下面代码示例全部以单训练节点为例。

from modelarts import workflow as wf

# 构建一个InputStorage对象, 并且假设配置的根目录为"/root/input-data/"
input_storage = wf.data.InputStorage(name="input_storage_name", title="title_info", description="description_info") # name字段必填,title, description可选填

# 构建一个OutputStorage对象, 并且假设配置的根目录为"/root/output/"
output_storage = wf.data.OutputStorage(name="output_storage_name", title="title_info", description="description_info") # name字段必填,title, description可选填

# 通过JobStep来定义一个训练节点,输入数据来源为OBS,并将训练结果输出到OBS中
job_step = wf.steps.JobStep(
    name="training_job", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="图像分类训练", # 标题信息,不填默认使用name
    algorithm=wf.AIGalleryAlgorithm(subscription_id="subscription_ID", item_version_id="item_version_ID"), # 训练使用的算法对象,示例中使用AIGallery订阅的算法
    inputs=[
        wf.steps.JobInput(name="data_url_1", data=wf.data.OBSPath(obs_path = input_storage.join("/dataset1/new.manifest"))),  # 获得的路径为:/root/input-data/dataset1/new.manifest
        wf.steps.JobInput(name="data_url_2", data=wf.data.OBSPath(obs_path = input_storage.join("/dataset2/new.manifest")))   # 获得的路径为:/root/input-data/dataset2/new.manifest
    ],
    outputs=wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=output_storage.join("/model/"))), # 训练输出的路径为:/root/output/执行ID/model/
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
                 flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")
        ),
        log_export_path=wf.steps.job_step.LogExportPath(obs_url=output_storage.join("/logs/"))  # 日志输出的路径为:/root/output/执行ID/logs/
    )# 训练资源规格信息
)

# 定义一个只包含job_step的工作流
workflow = wf.Workflow(
    name="test-workflow",
    desc="this is a test workflow",
    steps=[job_step],
    storages=[input_storage, output_storage] # 注意在整个工作流中使用到的Storage对象需要在这里添加
)

开发态配置

调用工作流对象的run方法,在开始运行时展示输入框,等待用户输入,如下所示:

图1 等待用户输入

要求用户输入已存在的路径,否则会报错,路径格式要求为:/桶名称/文件夹路径/。

运行态配置

调用工作流对象的release方法将工作流发布到运行态,在ModelArts管理控制台,单击Workflow找到相应的工作流进行根路径配置,如下所示:

图2 根目录配置