更新时间:2024-10-24 GMT+08:00

构建条件节点控制分支执行

功能介绍

主要用于执行流程的条件分支选择,可以简单的进行数值比较来控制执行流程,也可以根据节点输出的metric相关信息决定后续的执行流程。主要应用场景如下:

可以用于需要根据不同的输入值来决定后续执行流程的场景。例如:需要根据训练节点输出的精度信息来决定是重新训练还是进行模型的注册操作时可以使用该节点来实现流程的控制。

属性总览

您可以使用ConditionStep来构建条件节点,ConditionStep结构如下:

表1 ConditionStep

属性

描述

是否必填

数据类型

name

条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复

str

conditions

条件列表,列表中的多个Condition执行“逻辑与”操作

Condition或者Condition的列表

if_then_steps

条件表达式计算结果为True时,执行的step列表

str或者str列表

else_then_steps

条件表达式计算结果为False时,执行的step列表

str或者str列表

title

title信息,主要用于前端节点的名称展示

str

description

条件节点的描述信息

str

depend_steps

依赖的节点列表

Step或者Step的列表

表2 Condition

属性

描述

是否必填

数据类型

condition_type

条件类型,支持"=="、">"、">="、"in"、"<"、"<="、"!="、"or"操作符

ConditionTypeEnum

left

条件表达式的左值

int、float、str、bool、Placeholder、Sequence、Condition、MetricInfo

right

条件表达式的右值

int、float、str、bool、Placeholder、Sequence、Condition、MetricInfo

表3 MetricInfo

属性

描述

是否必填

数据类型

input_data

metric文件的存储对象,当前仅支持JobStep节点的输出

JobStep的输出

json_key

需要获取的metric信息对应的key值

str

结构内容详解:

  • Condition对象(由三部分组成:条件类型,左值以及右值)
    • 条件类型使用ConditionTypeEnum来获取,支持"=="、">"、">="、"in"、"<"、"<="、"!="、"or"操作符,具体映射关系如下表所示。

      枚举类型

      操作符

      ConditionTypeEnum.EQ

      ==

      ConditionTypeEnum.GT

      >

      ConditionTypeEnum.GTE

      >=

      ConditionTypeEnum.IN

      in

      ConditionTypeEnum.LT

      <

      ConditionTypeEnum.LTE

      <=

      ConditionTypeEnum.NOT

      !=

      ConditionTypeEnum.OR

      or

    • 左右值支持的类型有:int、float、str、bool、Placeholder、Sequence、Condition、MetricInfo。
    • 一个ConditionStep支持多个Condition对象,使用list表示,多个Condition之间进行&&操作。
  • if_then_steps和else_then_steps
    • if_then_steps表示的是当Condition比较的结果为true时允许执行的节点列表,存储的是节点名称;此时else_then_steps中的step跳过不执行。
    • else_then_step表示的是当Condition比较的结果为false时允许执行的节点列表,存储的是节点名称;此时if_then_steps中的step跳过不执行。

使用案例

根据需求参考简单示例或进阶示例。

简单示例

  • 通过参数配置实现
    import modelarts.workflow as wf
    
    left_value = wf.Placeholder(name="left_value", placeholder_type=wf.PlaceholderType.BOOL, default=True)
    
    # 条件对象
    condition = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=left_value, right=True) # 条件对象,包含类型以及左右值
    
    # 条件节点
    condition_step = wf.steps.ConditionStep(
        name="condition_step_test", # 条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        conditions=condition, # 条件对象,允许多个条件,条件之间的关系为&&
        if_then_steps="job_step_1", # 当condition结果为true时,名称为job_step_1的节点允许执行,名称为job_step_2的节点跳过不执行
        else_then_steps="job_step_2" # 当condition结果为false时,名称为job_step_2的节点允许执行,名称为job_step_1的节点跳过不执行
    )
    
    # 该节点仅作为示例使用,其他字段需自行补充
    job_step_1 = wf.steps.JobStep(
        name="job_step_1",
        depend_steps=condition_step
    )
    
    # 该节点仅作为示例使用,其他字段需自行补充
    model_step_1 = wf.steps.ModelStep(
        name="model_step_1",
        depend_steps=job_step_1
    )
    
    # 该节点仅作为示例使用,其他字段需自行补充
    job_step_2 = wf.steps.JobStep(
        name="job_step_2",
        depend_steps=condition_step
    )
    
    # 该节点仅作为示例使用,其他字段需自行补充
    model_step_2 = wf.steps.ModelStep(
        name="model_step_2",
        depend_steps=job_step_2
    )
    
    workflow = wf.Workflow(
        name="condition-demo",
        desc="this is a demo workflow",
        steps=[condition_step, job_step_1, job_step_2, model_step_1, model_step_2]
    )
    

    场景说明:job_step_1和job_step_2表示两个训练节点,并且均直接依赖于condition_step。condition_step通过参数配置决定后继节点的执行行为。

    执行情况分析:
    • 参数left_value默认值为True,则condition逻辑表达式计算结果为True:job_step_1执行,job_step_2跳过,并且以job_step_2为唯一根节点的分支所包含的所有节点也将跳过,即model_step_2会跳过,因此最终执行的节点有condition_step、job_step_1、model_step_1。
    • 如果设置left_value的值为Fasle,则condition逻辑表达式计算结果为False:job_step_2执行,job_step_1跳过,并且以job_step_1为唯一根节点的分支所包含的所有节点也将跳过,即model_step_1会跳过,因此最终执行的节点有condition_step、job_step_2、model_step_2。
  • 通过获取JobStep输出的相关metric指标信息实现
    from modelarts import workflow as wf
    
    # 构建一个OutputStorage对象,对训练输出目录做统一管理
    storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # name字段必填,title, description可选填
    
    # 定义输入的OBS对象
    obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
    
    # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
    job_step = wf.steps.JobStep(
        name="training_job",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="图像分类训练",  # 标题信息,不填默认使用name
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # 算法订阅ID
            item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
            parameters=[]
    
        ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[
            wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
            wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中)
        ],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")
            )
        )  # 训练资源规格信息
    )
    
    # 定义条件对象
    condition_lt = wf.steps.Condition(
        condition_type=wf.steps.ConditionTypeEnum.LT,
        left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"),
        right=0.5
    )
    
    condition_step = wf.steps.ConditionStep(
        name="condition_step_test", # 条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        conditions=condition_lt, # 条件对象,允许多个条件,条件之间的关系为&&
        if_then_steps="training_job_retrain", # 当condition结果为true时,名称为training_job_retrain的节点允许执行,名称为model_registration的节点跳过不执行
        else_then_steps="model_registration", # 当condition结果为false时,名称为model_registration的节点允许执行,名称为training_job_retrain的节点跳过不执行
        depend_steps=job_step
    )
    
    # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
    job_step_retrain = wf.steps.JobStep(
        name="training_job_retrain",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="图像分类重新训练训练",  # 标题信息,不填默认使用name
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # 算法订阅ID
            item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
            parameters=[]
    
        ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[
            wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_retrain"))),
            wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path_retrain/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中)
        ],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor_retrain", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")
            )
        ),  # 训练资源规格信息
        depend_steps=condition_step
    )
    
    # 定义模型名称参数
    model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)
    
    model_step = wf.steps.ModelStep(
        name="model_registration",  # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="模型注册",  # 标题信息
        inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()),  # job_step的输出作为输入
        outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep的输出
        depend_steps=condition_step,
    )
    
    workflow = wf.Workflow(
        name="condition-demo",
        desc="this is a demo workflow",
        steps=[job_step, condition_step, job_step_retrain, model_step],
        storages=storage
    )

    案例中ConditionStep节点通过获取job_step输出的accuracy指标信息与预置的值进行比较,决定重新训练还是模型注册。当job_step输出的accuracy指标数据小于阈值0.5时,condition_lt的计算结果为True,此时job_step_retrain运行,model_step跳过;反之job_step_retrain跳过,model_step执行。

    job_step输出的metric文件格式要求可参考创建Workflow训练作业节点部分,并且在Condition中只支持使用type为float类型的指标数据作为输入。

    此案例中metrics.json的内容示例如下:

    [
        {
            "key": "loss",
            "title": "loss",
            "type": "float",
            "data": {
                "value": 1.2
            }
        },
        {
            "key": "accuracy",
            "title": "accuracy",
            "type": "float",
            "data": {
                "value": 0.8
            }
        }   
    ]

进阶示例

import modelarts.workflow as wf

left_value = wf.Placeholder(name="left_value", placeholder_type=wf.PlaceholderType.BOOL, default=True)
condition1 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=left_value, right=True)

internal_condition_1 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.GT, left=10, right=9)
internal_condition_2 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.LT, left=10, right=9)

# condition2的结果为internal_condition_1 || internal_condition_2
condition2 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.OR, left=internal_condition_1, right=internal_condition_2)

condition_step = wf.steps.ConditionStep(
    name="condition_step_test", # 条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    conditions=[condition1, condition2], # 条件对象,允许多个条件,条件之间的关系为&&
    if_then_steps=["job_step_1"], # 当condition结果为true时,名称为job_step_1的节点允许执行,名称为job_step_2的节点跳过不执行
    else_then_steps=["job_step_2"] # 当condition结果为false时,名称为job_step_2的节点允许执行,名称为job_step_1的节点跳过不执行
)

# 该节点仅作为示例使用,其他字段需自行补充
job_step_1 = wf.steps.JobStep(
    name="job_step_1",
    depend_steps=condition_step
)

# 该节点仅作为示例使用,其他字段需自行补充
job_step_2 = wf.steps.JobStep(
    name="job_step_2",
    depend_steps=condition_step
)

workflow = wf.Workflow(
    name="condition-demo",
    desc="this is a demo workflow",
    steps=[condition_step, job_step_1, job_step_2],
)

ConditionStep支持多条件节点的嵌套使用,用户可以基于不同的场景灵活设计。

条件节点只支持双分支的选择执行,局限性较大,推荐您使用新的分支功能,可以在不添加新节点的情况下完全覆盖ConditionStep的能力,详情请参见配置节点参数控制分支执行章节。