Help Center> ModelArts> Workflows> How to Develop a Workflow?> Advanced Capabilities> Using Big Data Capabilities (DLI/MRS) in a Workflow
Updated on 2024-04-30 GMT+08:00

Using Big Data Capabilities (DLI/MRS) in a Workflow

Function

This phase calls MRS for big data cluster computing. It is used for batch data processing and model training.

Application Scenarios

You can use MRS Spark for big data computing in this phase.

Examples

On the MRS console, check available MRS clusters of your account. If no MRS cluster is available, create one with Spark selected.

You can use MrsStep to create a job phase. The following is an example of defining a MrsStep:

  • Specifying a boot script and cluster
    from modelarts import workflow as wf
    # Define a MrsJobStep using MrsStep.
    
    algorithm = wf.steps.MrsJobAlgorithm(
        boot_file="obs://spark-sql/wordcount.py",    # OBS path to the boot script
        parameters=[wf.AlgorithmParameters(name="run_args", value="--master,yarn-cluster")]
    )        
    inputs = wf.steps.MrsJobInput(name="mrs_input", data=wf.data.OBSPath(obs_path="/spark-sql/mrs_input/"))  # OBS path to the input data
    outputs = wf.steps.MrsJobOutput(name="mrs_output", obs_config=wf.data.OBSOutputConfig(obs_path="/spark-sql/mrs_output")) # OBS path to the output data
    step = wf.steps.MrsJobStep(
              name="mrs_test",           # Step name, which can be customized
              mrs_algorithm=algorithm,    
              inputs=inputs,    
              outputs=outputs,    
              cluster_id="cluster_id_xxx"   # MRS cluster ID
    )
  • Configuring a cluster and boot script
    from modelarts import workflow as wf
    # Define a phase using MrsJobStep.
    run_arg_description = "Program execution parameter, which is used as the program running environment parameter. The default value is (--master,yarn-cluster)".
    app_arg_description = "Program execution parameter, which is used as the input parameter of the boot script, for example, (--param_a=3,--param_b=4). This parameter is optional and left blank by default."
    mrs_outputs_description = "Data output path, which can be obtained from train_url in the parameter list."
    cluster_id_description = "cluster id of MapReduce Service"
    
    algorithm = wf.steps.MrsJobAlgorithm(
        boot_file=wf.Placeholder(name="boot_file",
                                 description="Program boot script", 
                                 placeholder_type=wf.PlaceholderType.STR,
                                 placeholder_format="obs"),
        parameters=[wf.AlgorithmParameters(name="run_args",
                                           value=wf.Placeholder(name="run_args",
                                                                description=run_arg_description,
                                                                default="--master,yarn-cluster",
                                                                placeholder_type=wf.PlaceholderType.STR),
                                           ),
                   wf.AlgorithmParameters(name="app_args",
                                          value=wf.Placeholder(name="app_args",
                                                               description=app_arg_description,
                                                                default="",
                                                                placeholder_type=wf.PlaceholderType.STR)
                                          )
                    ]
        )
    
    inputs = wf.steps.MrsJobInput(name="data_url",
                                 data=wf.data.OBSPlaceholder(name="data_url",object_type="directory"))
    
    outputs = wf.steps.MrsJobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=wf.Placeholder(name="train_url", placeholder_type=wf.PlaceholderType.STR, placeholder_format="obs",description=mrs_outputs_description)))
    
    mrs_job_step = wf.steps.MrsJobStep(
        name="mrs_job_test",
        mrs_algorithm=algorithm,
        inputs=inputs,
        outputs=outputs,
        cluster_id=wf.Placeholder(name="cluster_id", placeholder_type=wf.PlaceholderType.STR, description=cluster_id_description, placeholder_format="cluster")
    )
    
  • Using an MRS phase on the console

    After a workflow is published, configure phase parameters such as the data input, data output, boot script, and cluster ID on the workflow configuration page.