配置节点参数控制分支执行
功能介绍
支持单节点通过参数配置或者获取训练输出的metric指标信息来决定执行是否跳过,同时可以基于此能力完成对执行流程的控制。
应用场景
主要用于存在多分支选择执行的复杂场景,在每次启动执行后需要根据相关配置信息决定哪些分支需要执行,哪些分支需要跳过,达到分支部分执行的目的,与ConditionStep的使用场景类似,但功能更加强大。当前该能力适用于数据集创建节点、数据集标注节点、数据集导入节点、数据集版本发布节点、作业类型节点、模型注册节点以及服务部署节点。
控制单节点的执行
- 通过参数配置实现
from modelarts import workflow as wf condition_equal = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True) # 构建一个OutputStorage对象,对训练输出目录做统一管理 storage = wf.data.OutputStorage(name="storage_name", title="title_info", description="description_info") # name字段必填,title, description可选填 # 定义输入的OBS对象 obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory") # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step = wf.steps.JobStep( name="training_job", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), # JobStep的输入在运行时配置;data字段也可使用data=wf.data.OBSPath(obs_path="fake_obs_path")表示 outputs=wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))), # JobStep的输出 spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ), # 训练资源规格信息 policy=wf.steps.StepPolicy( skip_conditions=[condition_equal] # 通过skip_conditions中的计算结果决定job_step是否跳过 ) ) workflow = wf.Workflow( name="new-condition-demo", desc="this is a demo workflow", steps=[job_step], storages=storage )
案例中job_step配置了相关的跳过策略,并且通过一个bool类型的参数进行控制。当name为is_skip的Placeholder参数配置为True时,condition_equal的计算结果为True,此时job_step会被置为跳过,反之job_step正常执行,其中Condition对象详情可参考构建条件节点控制分支执行。
- 通过获取JobStep输出的相关metric指标信息实现
from modelarts import workflow as wf # 构建一个OutputStorage对象,对训练输出目录做统一管理 storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info") # name字段必填,title, description可选填 # 定义输入的OBS对象 obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory") # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step = wf.steps.JobStep( name="training_job", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), outputs=[ wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))), wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中) ], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ) # 训练资源规格信息 ) # 定义模型名称参数 model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR) # 定义条件对象 condition_lt = wf.steps.Condition( condition_type=wf.steps.ConditionTypeEnum.LT, left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"), right=0.5 ) model_step = wf.steps.ModelStep( name="model_registration", # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="模型注册", # 标题信息 inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()), # job_step的输出作为输入 outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")), # ModelStep的输出 depend_steps=[job_step], # 依赖的作业类型节点对象 policy=wf.steps.StepPolicy(skip_conditions=condition_lt) # 通过skip_conditions中的计算结果决定model_step是否跳过 ) workflow = wf.Workflow( name="new-condition-demo", desc="this is a demo workflow", steps=[job_step, model_step], storages=storage )
案例中model_step配置了相关的跳过策略,并且通过获取job_step输出的accuracy指标信息与预置的值进行比较,决定是否需要进行模型注册。当job_step输出的accuracy指标数据小于阈值0.5时,condition_lt的计算结果为True,此时model_step会被置为跳过,反之model_step正常执行。
job_step输出的metric文件格式要求可参考创建Workflow训练作业节点部分,并且在Condition中只支持使用type为float类型的指标数据作为输入。
此案例中metrics.json的内容示例如下:[ { "key": "loss", "title": "loss", "type": "float", "data": { "value": 1.2 } }, { "key": "accuracy", "title": "accuracy", "type": "float", "data": { "value": 0.8 } } ]
控制多分支的部分执行
from modelarts import workflow as wf # 构建一个OutputStorage对象,对训练输出目录做统一管理 storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info") # name字段必填,title, description可选填 # 定义输入的OBS对象 obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory") condition_equal_a = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_a_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True) # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step_a = wf.steps.JobStep( name="training_job_a", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_a")))], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ), # 训练资源规格信息 policy=wf.steps.StepPolicy(skip_conditions=condition_equal_a) ) condition_equal_b = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_b_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True) # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step_b = wf.steps.JobStep( name="training_job_b", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_b")))], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ), # 训练资源规格信息 policy=wf.steps.StepPolicy(skip_conditions=condition_equal_b) ) # 定义模型名称参数 model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR) model_step = wf.steps.ModelStep( name="model_registration", # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="模型注册", # 标题信息 inputs=wf.steps.ModelInput(name='model_input', data=wf.data.DataConsumptionSelector(data_list=[job_step_a.outputs["train_url"].as_input(), job_step_b.outputs["train_url"].as_input()])), # 选择job_step_a或者job_step_b的输出作为输入 outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")), # ModelStep的输出 depend_steps=[job_step_a, job_step_b], # 依赖的作业类型节点对象 ) workflow = wf.Workflow( name="new-condition-demo", desc="this is a demo workflow", steps=[job_step_a, job_step_b, model_step], storages=storage )
案例中job_step_a和job_step_b均配置了跳过策略,并且都使用参数进行控制。当参数值配置不同时,model_step的执行可以分为以下几种情况(model_step没有配置跳过策略,因此会遵循默认规则):
job_step_a_is_skip参数值 |
job_step_b_is_skip参数值 |
model_step是否执行 |
---|---|---|
True |
True |
跳过 |
False |
执行 |
|
False |
True |
执行 |
False |
执行 |
默认规则:当某个节点依赖的所有节点状态均为跳过时,该节点自动跳过,否则正常执行,此判断逻辑可扩展至任意节点。
在上述案例的基础上,如果需要打破默认规则,在job_step_a以及job_step_b跳过时,model_step也允许执行,则只需要在model_step中也配置跳过策略即可(跳过策略的优先级高于默认规则)。