文档首页> AI开发平台ModelArts> Workflow> 如何开发Workflow> 高阶能力> 在Workflow中使用大数据能力(DLI/MRS)
更新时间:2024-04-30 GMT+08:00

在Workflow中使用大数据能力(DLI/MRS)

功能介绍

该节点通过调用MRS服务,提供大数据集群计算能力。主要用于数据批量处理、模型训练等场景。

应用场景

需要使用MRS Spark组件进行大量数据的计算时,可以根据已有数据使用该节点进行训练计算。

使用案例

在华为云MRS服务下查看自己账号下可用的MRS集群,如果没有,则需要创建,当前需要集群有Spark组件,安装时,注意勾选上。

您可以使用MrsStep来创建作业类型节点。定义MrsStep示例如下。

  • 指定启动脚本与集群
    from modelarts import workflow as wf
    # 通过MrsStep来定义一个MrsJobStep节点,
    
    algorithm = wf.steps.MrsJobAlgorithm(
        boot_file="obs://spark-sql/wordcount.py",    #执行脚本OBS路径
        parameters=[wf.AlgorithmParameters(name="run_args", value="--master,yarn-cluster")]
    )        
    inputs = wf.steps.MrsJobInput(name="mrs_input", data=wf.data.OBSPath(obs_path="/spark-sql/mrs_input/"))  #输入数据的OBS路径
    outputs = wf.steps.MrsJobOutput(name="mrs_output", obs_config=wf.data.OBSOutputConfig(obs_path="/spark-sql/mrs_output")) #输出的OBS路径
    step = wf.steps.MrsJobStep(
              name="mrs_test",           #step名称,可自定义
              mrs_algorithm=algorithm,    
              inputs=inputs,    
              outputs=outputs,    
              cluster_id="cluster_id_xxx"   #MRS集群ID
    )
  • 使用选取集群和启动脚本的形式
    from modelarts import workflow as wf
    # 通过MrsJobStep来定义一个节点
    run_arg_description = "程序执行参数, 作为程序运行环境参数, 默认为(--master,yarn-cluster)"
    app_arg_description = "程序执行参数, 作为启动脚本的入参, 例如(--param_a=3,--param_b=4)默认为空,非必填"
    mrs_outputs_description = "数据输出路径, 可以通过从参数列表中获取--train_url参数获取"
    cluster_id_description = "cluster id of MapReduce Service"
    
    algorithm = wf.steps.MrsJobAlgorithm(
        boot_file=wf.Placeholder(name="boot_file",
                                 description="程序启动脚本",
                                 placeholder_type=wf.PlaceholderType.STR,
                                 placeholder_format="obs"),
        parameters=[wf.AlgorithmParameters(name="run_args",
                                           value=wf.Placeholder(name="run_args",
                                                                description=run_arg_description,
                                                                default="--master,yarn-cluster",
                                                                placeholder_type=wf.PlaceholderType.STR),
                                           ),
                   wf.AlgorithmParameters(name="app_args",
                                          value=wf.Placeholder(name="app_args",
                                                               description=app_arg_description,
                                                                default="",
                                                                placeholder_type=wf.PlaceholderType.STR)
                                          )
                    ]
        )
    
    inputs = wf.steps.MrsJobInput(name="data_url",
                                 data=wf.data.OBSPlaceholder(name="data_url",object_type="directory"))
    
    outputs = wf.steps.MrsJobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=wf.Placeholder(name="train_url", placeholder_type=wf.PlaceholderType.STR, placeholder_format="obs",description=mrs_outputs_description)))
    
    mrs_job_step = wf.steps.MrsJobStep(
        name="mrs_job_test",
        mrs_algorithm=algorithm,
        inputs=inputs,
        outputs=outputs,
        cluster_id=wf.Placeholder(name="cluster_id", placeholder_type=wf.PlaceholderType.STR, description=cluster_id_description, placeholder_format="cluster")
    )
    
  • 在控制台上如何使用MRS节点

    Workflow发布后,在Workflow配置页,配置节点的数据输入,输出,启动脚本,集群ID等参数。