Updated on 2024-04-30 GMT+08:00

Examples

Simple Examples

  • Implemented using parameter configurations
    import modelarts.workflow as wf
    
    left_value = wf.Placeholder(name="left_value", placeholder_type=wf.PlaceholderType.BOOL, default=True)
    
    # Condition object
    condition = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=left_value, right=True) # Condition object, including the type, left value, and right value.
    
    # Condition phase
    condition_step = wf.steps.ConditionStep(
        name="condition_step_test", # Name of the condition phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        conditions=condition, # Condition objects. The relationship between the conditions is &&.
        if_then_steps="job_step_1", # If conditions evaluate to true, job_step_1 is ready for execution, and job_step_2 is skipped.
        else_then_steps="job_step_2" # If conditions evaluate to false, job_step_2 is ready for execution, and job_step_1 is skipped.
    )
    
    # This phase is used only as an example. You need to supplement other fields as required.
    job_step_1 = wf.steps.JobStep(
        name="job_step_1",
        depend_steps=condition_step
    )
    
    # This phase is used only as an example. You need to supplement other fields as required.
    model_step_1 = wf.steps.ModelStep(
        name="model_step_1",
        depend_steps=job_step_1
    )
    
    # This phase is used only as an example. You need to supplement other fields as required.
    job_step_2 = wf.steps.JobStep(
        name="job_step_2",
        depend_steps=condition_step
    )
    
    # This phase is used only as an example. You need to supplement other fields as required.
    model_step_2 = wf.steps.ModelStep(
        name="model_step_2",
        depend_steps=job_step_2
    )
    
    workflow = wf.Workflow(
        name="condition-demo",
        desc="this is a demo workflow",
        steps=[condition_step, job_step_1, job_step_2, model_step_1, model_step_2]
    )
    

    Scenario description: job_step_1 and job_step_2 indicate two training phases that depend on condition_step. condition_step parameters determine the subsequent phase execution.

    Execution analysis:
    • If the default value of left_value is True, the calculation result of the condition logical expression is True. Then, job_step_1 is executed, job_step_2 is skipped, and all phases contained in the branches that use job_step_2 as the unique root node are skipped. That is, model_step_2 is skipped. Therefore, condition_step, job_step_1, and model_step_1 are executed.
    • If left_value is set to False, the calculation result of the condition logical expression is False. Then, job_step_2 is executed, job_step_1 is skipped, and all phases contained in the branches that use job_step_1 as the unique root node are skipped. That is, model_step_1 is skipped, and condition_step, job_step_2, and model_step_2 are executed.
  • Implemented by obtaining the metric information output by JobStep
    from modelarts import workflow as wf
    
    # Create an OutputStorage object to centrally manage training output directories.
    storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # The name field is mandatory, and the title and description fields are optional.
    
    # Define the input OBS object.
    obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
    
    # Use JobStep to define a training phase, and use OBS to store the output.
    job_step = wf.steps.JobStep(
        name="training_job", # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Image classification training",  # Title, which defaults to the value of name.
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
            item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
            parameters=[]
    
        ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[
            wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
            wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # Metric output path. Metric information is automatically output by the job script based on the specified data format. (In the example, the metric information needs to be output to the metrics.json file in the training output directory.)
        ],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
            )
         )  # Training flavors
    )
    
    # Define a condition object.
    condition_lt = wf.steps.Condition(
        condition_type=wf.steps.ConditionTypeEnum.LT,
        left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"),
        right=0.5
    )
    
    condition_step = wf.steps.ConditionStep(
        name="condition_step_test", # Name of the condition phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        conditions=condition_lt, # Condition objects. The relationship between the conditions is &&.
        if_then_steps="training_job_retrain", # If conditions evaluate to true, training_job_retrain is ready for execution, and model_registration is skipped.
        else_then_steps="model_registration", # If conditions evaluate to false, model_registration is ready for execution, and training_job_retrain is skipped.
        depend_steps=job_step
    )
    
    # Use JobStep to define a training phase, and use OBS to store the output.
    job_step_retrain = wf.steps.JobStep(
        name="training_job_retrain",  # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Image classification retraining",  # Title, which defaults to the value of name.
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
            item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
            parameters=[]
    
        ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[
            wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_retrain"))),
            wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path_retrain/metrics.json", create_dir=False))) # Metric output path. Metric information is automatically output by the job script based on the specified data format. (In the example, the metric information needs to be output to the metrics.json file in the training output directory.)
        ],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor_retrain", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
            )
         ),  # Training flavors
        depend_steps=condition_step
    )
    
    # Define model name parameters.
    model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)
    
    model_step = wf.steps.ModelStep(
        name="model_registration", # Name of the model registration phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Model registration",  # Title
        inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()),  # job_step output is used as the input.
        outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep outputs
        depend_steps=condition_step,
    )
    
    workflow = wf.Workflow(
        name="condition-demo",
        desc="this is a demo workflow",
        steps=[job_step, condition_step, job_step_retrain, model_step],
        storages=storage
    )

    In this example, ConditionStep obtains the accuracy output by job_step and compares it with the preset value to determine whether to retrain or register the model. When the accuracy output by job_step is less than the threshold 0.5, the calculation result of condition_lt is True. In this case, job_step_retrain runs and model_step skips. Otherwise, job_step_retrain skips and model_step runs.

    For details about the format requirements of the metric file generated by job_step, see Job Phase. In the condition phase, only the metric data whose type is float can be used as the input.

    The following is an example of the metrics.json file:

    [
        {
            "key": "loss",
            "title": "loss",
            "type": "float",
            "data": {
                "value": 1.2
            }
        },
        {
            "key": "accuracy",
            "title": "accuracy",
            "type": "float",
            "data": {
                "value": 0.8
            }
        }   
    ]

Advanced Example

import modelarts.workflow as wf

left_value = wf.Placeholder(name="left_value", placeholder_type=wf.PlaceholderType.BOOL, default=True)
condition1 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=left_value, right=True)

internal_condition_1 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.GT, left=10, right=9)
internal_condition_2 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.LT, left=10, right=9)

# The result of condition2 is internal_condition_1 || internal_condition_2.
condition2 = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.OR, left=internal_condition_1, right=internal_condition_2)

condition_step = wf.steps.ConditionStep(
    name="condition_step_test", # Name of the condition phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
    conditions=[condition1, condition2], # Condition objects. The relationship between the conditions is &&.
    if_then_steps=["job_step_1"], # If conditions evaluate to true, job_step_1 is ready for execution, and job_step_2 is skipped.
    else_then_steps=["job_step_2"] # If conditions evaluate to false, job_step_2 is ready for execution, and job_step_1 is skipped.
)

# This phase is used only as an example. You need to supplement other fields as required.
job_step_1 = wf.steps.JobStep(
    name="job_step_1",
    depend_steps=condition_step
)

# This phase is used only as an example. You need to supplement other fields as required.
job_step_2 = wf.steps.JobStep(
    name="job_step_2",
    depend_steps=condition_step
)

workflow = wf.Workflow(
    name="condition-demo",
    desc="this is a demo workflow",
    steps=[condition_step, job_step_1, job_step_2],
)

ConditionStep supports nested condition phases. You can flexibly design tit based on different scenarios.

The condition phase can only support two branches, which is very limiting. You can use the new branch function to replace the ConditionStep capability without creating new phases. For details, see Branch Control.