Updated on 2024-04-30 GMT+08:00

Branch Control

Function

You can use parameters or metrics from training output to decide whether to run a phase. This way, you can control the process.

Application Scenarios

This function is used for complex scenarios that involve multiple branches. When each execution starts, the workflow decides which branches to run and which ones to skip based on the relevant configuration information. This way, only some branches are executed. This function has a similar use case as ConditionStep, but it is more powerful. This function applies to the dataset creation phase, labeling phase, dataset import phase, dataset release phase, job phase, model registration phase, and service deployment phase.

Examples

  • Controlling the execution of a single phase
    • Implemented using parameter configurations
      from modelarts import workflow as wf
      
      condition_equal = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)
      
      # Create an OutputStorage object to centrally manage training output directories.
      storage = wf.data.OutputStorage(name="storage_name", title="title_info",
                                      description="description_info")  # Only name is mandatory.
      
      # Define the input OBS object.
      obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
      
      # Use JobStep to define a training phase, and use OBS to store the output.
      job_step = wf.steps.JobStep(
          name="training_job", # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
          title="Image classification training",  # Title, which defaults to the value of name.
          algorithm=wf.AIGalleryAlgorithm(
              subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
              item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
              parameters=[]
      
          ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
      
          inputs=wf.steps.JobInput(name="data_url", data=obs_data),
          # JobStep input is configured when the workflow is running. You can also use data=wf.data.OBSPath(obs_path="fake_obs_path") for the data field.
          outputs=wf.steps.JobOutput(name="train_url",
                                     obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
          # JobStep output
          spec=wf.steps.JobSpec(
              resource=wf.steps.JobResource(
                  flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
      
              )
          ), # Training flavors
          policy=wf.steps.StepPolicy(
              skip_conditions=[condition_equal] # Determines whether to skip job_step based on the calculation result of skip_conditions.
          )
      )
      
      workflow = wf.Workflow(
          name="new-condition-demo",
          desc="this is a demo workflow",
          steps=[job_step],
          storages=storage
      )

      In this example, job_step has a skip policy that is controlled by a bool parameter. If the placeholder parameter named is_skip is set to True, then job_step is skipped when condition_equal evaluates to True. Otherwise, job_step is run. For more details about the condition object, see Condition Phase.

    • Implemented by obtaining the metric information output by JobStep
      from modelarts import workflow as wf
      
      # Create an OutputStorage object to centrally manage training output directories.
      storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # The name field is mandatory, and the title and description fields are optional.
      
      # Define the input OBS object.
      obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
      
      # Use JobStep to define a training phase, and use OBS to store the output.
      job_step = wf.steps.JobStep(
          name="training_job", # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
          title="Image classification training",  # Title, which defaults to the value of name.
          algorithm=wf.AIGalleryAlgorithm(
              subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
              item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
              parameters=[]
      
          ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
          inputs=wf.steps.JobInput(name="data_url", data=obs_data),
          outputs=[
              wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
              wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # Metric output path. Metric information is automatically output by the job script based on the specified data format. (In the example, the metric information needs to be output to the metrics.json file in the training output directory.)
          ],
          spec=wf.steps.JobSpec(
              resource=wf.steps.JobResource(
                  flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
              )
           )  # Training flavors
      )
      
      # Define model name parameters.
      model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)
      
      # Define a condition object.
      condition_lt = wf.steps.Condition(
          condition_type=wf.steps.ConditionTypeEnum.LT,
          left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"),
          right=0.5
      )
      
      model_step = wf.steps.ModelStep(
          name="model_registration", # Name of the model registration phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
          title="Model registration",  # Title
          inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()),  # job_step output is used as the input.
          outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep outputs
          depend_steps=job_step # Preceding job phase
          policy=wf.steps.StepPolicy(skip_conditions=condition_lt) # Determines whether to skip model_step based on the calculation result of skip_conditions.
      )
      
      workflow = wf.Workflow(
          name="new-condition-demo",
          desc="this is a demo workflow",
          steps=[job_step, model_step],
          storages=storage
      )

      In this example, model_step has a skip policy. The model registration depends on whether the accuracy output by job_step meets the preset value. When the accuracy output by job_step is less than the threshold 0.5, the calculation result of condition_lt is True. In this case, model_step skips. Otherwise, model_step runs.

      For details about the format requirements of the metric file generated by job_step, see Branch Control. In the condition phase, only the metric data whose type is float can be used as the input.

      The following is an example of the metrics.json file:
      [
          {
              "key": "loss",
              "title": "loss",
              "type": "float",
              "data": {
                  "value": 1.2
              }
          },
          {
              "key": "accuracy",
              "title": "accuracy",
              "type": "float",
              "data": {
                  "value": 0.8
              }
          }   
      ]
  • Controlling partial execution of multiple branches
    from modelarts import workflow as wf
    
    # Create an OutputStorage object to centrally manage training output directories.
    storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # The name field is mandatory, and the title and description fields are optional.
    
    # Define the input OBS object.
    obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
    
    condition_equal_a = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_a_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)
    
    # Use JobStep to define a training phase, and use OBS to store the output.
    job_step_a = wf.steps.JobStep(
        name="training_job_a",  # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Image classification training",  # Title, which defaults to the value of name.
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
            item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
            parameters=[]
    
        ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_a")))],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
    
            )
         ),  # Training flavors
        policy=wf.steps.StepPolicy(skip_conditions=condition_equal_a)
    )
    
    condition_equal_b = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_b_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)
    
    # Use JobStep to define a training phase, and use OBS to store the output.
    job_step_b = wf.steps.JobStep(
        name="training_job_b",  # Name of a training phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Image classification training",  # Title, which defaults to the value of name.
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # Subscription ID of the subscribed algorithm
            item_version_id="item_version_id",  # Algorithm version ID. You can also enter the version number instead.
            parameters=[]
    
        ), # Algorithm used for training. An algorithm subscribed to in AI Gallery is used in this example. If the value of an algorithm hyperparameter does not need to be changed, you do not need to configure the hyperparameter in parameters. Hyperparameter values will be automatically filled.
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_b")))],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="Training flavor")
    
            )
         ),  # Training flavors
        policy=wf.steps.StepPolicy(skip_conditions=condition_equal_b)
    )
    
    # Define model name parameters.
    model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)
    
    model_step = wf.steps.ModelStep(
        name="model_registration", # Name of the model registration phase. The name contains a maximum of 64 characters, including only letters, digits, underscores (_), and hyphens (-). It must start with a letter and must be unique in a workflow.
        title="Model registration",  # Title
        inputs=wf.steps.ModelInput(name='model_input', data=wf.data.DataConsumptionSelector(data_list=[job_step_a.outputs["train_url"].as_input(), job_step_b.outputs["train_url"].as_input()])),  # Select the output of job_step_a or job_step_b as the input.
        outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep outputs
        depend_steps=[job_step_a, job_step_b],  # Preceding job phase
    )
    
    workflow = wf.Workflow(
        name="new-condition-demo",
        desc="this is a demo workflow",
        steps=[job_step_a, job_step_b, model_step],
        storages=storage
    )

    In this example, both job_step_a and job_step_b have a skip policy that is controlled by parameters. When the parameter values are different, the execution of model_step can be divided into the following cases (model_step has no skip policy configured, so it follows the default rule).

    job_step_a_is_skip

    job_step_b_is_skip

    Whether to Execute model_step

    True

    True

    No

    False

    Yes

    False

    True

    Yes

    False

    Yes

    Default rule: A phase is automatically skipped if all the phases it depends on are skipped. Otherwise, the phase is run. This logic can apply to any phase.

    Based on the previous example, if you want to override the default rule and make model_step run when job_step_a and job_step_b are skipped, you only need to configure a skip policy in model_step. The skip policy takes precedence over the default rule.