在Workflow中使用大数据能力(DLI/MRS)
功能介绍
该节点通过调用MRS服务,提供大数据集群计算能力。主要用于数据批量处理、模型训练等场景。
应用场景
需要使用MRS Spark组件进行大量数据的计算时,可以根据已有数据使用该节点进行训练计算。
使用案例
在华为云MRS服务下查看自己账号下可用的MRS集群,如果没有,则需要创建,当前需要集群有Spark组件,安装时,注意勾选上。
您可以使用MrsStep来创建作业类型节点。定义MrsStep示例如下。
- 指定启动脚本与集群
from modelarts import workflow as wf # 通过MrsStep来定义一个MrsJobStep节点, algorithm = wf.steps.MrsJobAlgorithm( boot_file="obs://spark-sql/wordcount.py", #执行脚本OBS路径 parameters=[wf.AlgorithmParameters(name="run_args", value="--master,yarn-cluster")] ) inputs = wf.steps.MrsJobInput(name="mrs_input", data=wf.data.OBSPath(obs_path="/spark-sql/mrs_input/")) #输入数据的OBS路径 outputs = wf.steps.MrsJobOutput(name="mrs_output", obs_config=wf.data.OBSOutputConfig(obs_path="/spark-sql/mrs_output")) #输出的OBS路径 step = wf.steps.MrsJobStep( name="mrs_test", #step名称,可自定义 mrs_algorithm=algorithm, inputs=inputs, outputs=outputs, cluster_id="cluster_id_xxx" #MRS集群ID )
- 使用选取集群和启动脚本的形式
from modelarts import workflow as wf # 通过MrsJobStep来定义一个节点 run_arg_description = "程序执行参数, 作为程序运行环境参数, 默认为(--master,yarn-cluster)" app_arg_description = "程序执行参数, 作为启动脚本的入参, 例如(--param_a=3,--param_b=4)默认为空,非必填" mrs_outputs_description = "数据输出路径, 可以通过从参数列表中获取--train_url参数获取" cluster_id_description = "cluster id of MapReduce Service" algorithm = wf.steps.MrsJobAlgorithm( boot_file=wf.Placeholder(name="boot_file", description="程序启动脚本", placeholder_type=wf.PlaceholderType.STR, placeholder_format="obs"), parameters=[wf.AlgorithmParameters(name="run_args", value=wf.Placeholder(name="run_args", description=run_arg_description, default="--master,yarn-cluster", placeholder_type=wf.PlaceholderType.STR), ), wf.AlgorithmParameters(name="app_args", value=wf.Placeholder(name="app_args", description=app_arg_description, default="", placeholder_type=wf.PlaceholderType.STR) ) ] ) inputs = wf.steps.MrsJobInput(name="data_url", data=wf.data.OBSPlaceholder(name="data_url",object_type="directory")) outputs = wf.steps.MrsJobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=wf.Placeholder(name="train_url", placeholder_type=wf.PlaceholderType.STR, placeholder_format="obs",description=mrs_outputs_description))) mrs_job_step = wf.steps.MrsJobStep( name="mrs_job_test", mrs_algorithm=algorithm, inputs=inputs, outputs=outputs, cluster_id=wf.Placeholder(name="cluster_id", placeholder_type=wf.PlaceholderType.STR, description=cluster_id_description, placeholder_format="cluster") )
- 在控制台上如何使用MRS节点
Workflow发布后,在Workflow配置页,配置节点的数据输入,输出,启动脚本,集群ID等参数。