更新时间:2024-08-16 GMT+08:00
分享

配置多分支节点数据

功能介绍

仅用于存在多分支执行的场景,在编写构建工作流节点时,节点的数据输入来源暂不确定,可能是多个依赖节点中任意一个节点的输出。只有当依赖节点全部执行完成后,才会根据实际执行情况自动获取有效输出作为输入。

使用案例

from modelarts import workflow as wf

condition_equal = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="is_true", placeholder_type=wf.PlaceholderType.BOOL), right=True)
condition_step = wf.steps.ConditionStep(
    name="condition_step",
    conditions=[condition_equal],
    if_then_steps=["training_job_1"],
    else_then_steps=["training_job_2"],
)

# 构建一个OutputStorage对象,对训练输出目录做统一管理
storage = wf.data.OutputStorage(name="storage_name", title="title_info",
                                description="description_info")  # name字段必填,title, description可选填

# 定义输入的OBS对象
obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")

# 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
job_step_1 = wf.steps.JobStep(
    name="training_job_1",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="图像分类训练",  # 标题信息,不填默认使用name
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="subscription_id",  # 算法订阅ID
        item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
        parameters=[]

    ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值

    inputs=wf.steps.JobInput(name="data_url", data=obs_data),
    # JobStep的输入在运行时配置;data字段也可使用data=wf.data.OBSPath(obs_path="fake_obs_path")表示
    outputs=wf.steps.JobOutput(name="train_url",
                               obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
    # JobStep的输出
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")

        )
    ),  # 训练资源规格信息
    depend_steps=[condition_step]
)

# 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
job_step_2 = wf.steps.JobStep(
    name="training_job_2",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="图像分类训练",  # 标题信息,不填默认使用name
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="subscription_id",  # 算法订阅ID
        item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
        parameters=[]

    ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值

    inputs=wf.steps.JobInput(name="data_url", data=obs_data),
    # JobStep的输入在运行时配置;data字段也可使用data=wf.data.OBSPath(obs_path="fake_obs_path")表示
    outputs=wf.steps.JobOutput(name="train_url",
                               obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
    # JobStep的输出
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")

        )
    ),  # 训练资源规格信息
    depend_steps=[condition_step]
)

# 定义模型名称参数
model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)

model_step = wf.steps.ModelStep(
    name="model_registration",  # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="模型注册",  # 标题信息
    inputs=wf.steps.ModelInput(name='model_input', data=wf.data.DataConsumptionSelector(data_list=[job_step_1.outputs["train_url"].as_input(), job_step_2.outputs["train_url"].as_input()])),  # 选择job_step_1或者job_step_2的输出作为输入
    outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")), # ModelStep的输出
    depend_steps=[job_step_1, job_step_2] # 依赖的作业类型节点对象
)# job_step是wf.steps.JobStep的 实例对象,train_url是wf.steps.JobOutput的name字段值


workflow = wf.Workflow(name="data-select-demo",
                       desc="this is a test workflow",
                       steps=[condition_step, job_step_1, job_step_2, model_step],
                       storages=storage
                       )

案例中的Workflow存在两个并行分支,并且同时只有一条分支会执行,由condition_step的相关配置决定。model_step的输入来源为job_step_1或者job_step_2的输出,当job_step_1节点所在分支执行,job_step_2节点所在分支跳过时,model_step节点执行时自动获取job_step_1的输出作为输入,反之自动获取job_step_2的输出作为输入。

相关文档