预置框架启动文件的启动流程说明
ModelArts Standard训练服务为用户预置了多种主流AI框架,并针对不同框架的特点进行了专门的适配优化。在使用这些预置框架进行模型训练时,用户需要根据所选框架的特点,相应地调整训练启动命令,以确保训练任务能够顺利执行。
ModelArts提供4种预置框架,便于用户在创建训练作业时根据需求直接调整使用。4种框架如下:
场景分类 |
预置框架名称 |
解释说明 |
优势与使用建议 |
||
---|---|---|---|---|---|
NPU场景 |
是适配加速芯片Ascend的一组AI框架+运行环境+启动方式的集合。 有三种启动方式。 |
用户使用Ascend NPU时选择。 可提升模型训练的效率。 |
RTF:通用场景。 |
||
torchrun:使用PyTorch+Ascend Extension for PyTorch框架。 |
|||||
msrun:安装MindSpore后即可使用。不依赖第三方库以及配置文件,具有容灾恢复功能,安全性较好。 |
|||||
GPU场景 |
基于PyTorch-GPU的预置框架。 |
灵活性和易用性上占优,适合研究和快速开发。 |
|||
基于TensorFlow-GPU的预置框架。 |
生产部署和性能优化上更好,适合企业应用。 |
||||
使用Horovod/MPI/MindSpore-GPU预置框架来运行的启动文件,平台自动以mpirun命令启动。 |
此方式依赖开源库OpenMPI,推荐有OpenMPI使用经验的用户使用此种启动方式。 |
本章节详细介绍基于不同的预置框架启动原理,在创建训练作业时,用户如何修改训练的启动文件。
Ascend-Powered-Engine框架启动
在ModelArts创建训练作业界面选择AI框架时,有一个AI框架是“Ascend-Powered-Engine”,这一选项并不是传统意义上的AI框架(如PyTorch、TensorFlow),也不是并行执行框架(如MPI),而是为适配昇腾(Ascend)加速芯片而设计的一组AI框架、运行环境和启动方式的集合。
由于Snt9系列Ascend加速卡运行在ARM架构的CPU环境中,因此其对应的Docker镜像也均为ARM架构镜像。与GPU场景下镜像中安装的CUDA计算库(由英伟达推出)不同,“Ascend-Powered-Engine”的镜像中安装的是CANN计算库(华为针对AI场景推出的异构计算架构),以实现与昇腾驱动的适配。
在提交训练作业后,ModelArts Standard平台会自动运行训练作业的启动文件。另外,使用“Ascend-Powered-Engine”框架时,其单机启动命令和分布式启动命令在形式上并无区别。
Ascend-Powered-Engine框架支持3种启动方式来启动“启动文件”,默认是基于“RANK_TABLE_FILE”启动,但用户也可以通过配置环境变量“MA_RUN_METHOD”选择其他方式启动。MA_RUN_METHOD环境变量支持“torchrun”和“msrun”两种启动方式,为用户提供灵活的选择。
- 启动方式一:使用RTF文件启动训练作业
在没有配置环境变量“MA_RUN_METHOD”时,ModelArts Standard平台默认使用rank table file(RTF)文件启动训练作业的“启动文件”。
每个训练作业的启动文件的运行次数取决于任务卡数,即在训练作业运行时,有N个任务卡数训练作业内就会运行N次启动文件。例如,单机1卡,则worker-0任务的启动文件会被运行1次;单机8卡,则worker-0任务的启动文件会被运行8次。因此需要避免在启动文件中进行端口监听。
启动文件会被自动设置如下环境变量:
- RANK_TABLE_FILE:RTF文件路径。
- ASCEND_DEVICE_ID:逻辑device_id,例如单卡训练,该值始终为 0。
- RANK_ID:可以理解为训练作业级的device逻辑(顺序)编号。
- RANK_SIZE:根据RTF中device的数目设置该值,例如“4 * Snt9b”,则该值即为4。
当需要启动文件仍然在逻辑上仅运行1次时,则可以在启动文件中判断“ASCEND_DEVICE_ID”的值,当值为“0”则执行逻辑,当值为非0则直接退出。
当需要启用ranktable动态路由进行训练网络加速时,则可以添加环境变量“ROUTE_PLAN=true”。详细使用指导请参见训练作业动态路由加速。
Ascend-Powered-Engine框架对应的代码示例“mindspore-verification.py”,请参见训练mindspore-verification.py文件。
- 启动方式二:使用torchrun命令启动训练作业
当环境变量“MA_RUN_METHOD=torchrun”时,表示ModelArts Standard平台使用torchrun命令启动训练作业的“启动文件”。
要求PyTorch版本大于等于1.11.0。
- 单机时,ModelArts Standard平台使用如下命令启动训练作业的“启动文件”。
torchrun --standalone --nnodes=${MA_NUM_HOSTS} --nproc_per_node=${MA_NUM_GPUS} ${MA_EXTRA_TORCHRUN_PARAMS} "启动文件" {arg1} {arg2} ...
- 多机时,ModelArts Standard平台使用如下命令启动训练作业的“启动文件”。
torchrun --nnodes=${MA_NUM_HOSTS} --nproc_per_node=${MA_NUM_GPUS} --node_rank=${VC_TASK_INDEX} --master_addr={master_addr} --master_port=${MA_TORCHRUN_MASTER_PORT} --rdzv_id={ma_job_name} --rdzv_backend=static ${MA_EXTRA_TORCHRUN_PARAMS} "启动文件" {arg1} {arg2} ...
参数说明如下:
表2 使用torchrun命令启动训练作业参数说明 参数名
参数解释
standalone
标识为单任务实例作业。
nnodes
任务实例个数。
nproc_per_node
每个任务实例启动的主进程数,设置为任务分配的NPU数相同。
node_rank
任务rank,用于多任务分布式训练。
master_addr
主任务(rank 0)的地址,设置为任务worker-0的通信域名。
master_port
在主任务(rank 0)上,用于分布式训练期间通信的端口。默认设置为18888端口。当遇到master_port冲突问题时,可通过设置MA_TORCHRUN_MASTER_PORT环境变量值修改端口配置。
rdzv_id
Rendezvous标识,设置为带有训练作业ID的值。
rdzv_backend
Rendezvous后端,固定设置为static,即不使用Rendezvous,而是使用master_addr和master_port配置。
- 另外,可通过设置MA_EXTRA_TORCHRUN_PARAMS环境变量值,以增加额外的torchrun命令参数,或是覆盖预设的torchrun命令参数。例如配置torchrun命令中rdzv_conf参数的训练作业API环境变量的部分示例如下:
"environments": { "MA_RUN_METHOD": "torchrun", "MA_EXTRA_TORCHRUN_PARAMS": "--rdzv_conf=timeout=7200" }
如果在torchrun初始化分布式一致性协商阶段出现“RuntimeError:Socket Timeout”错误时,可以查看常见问题1,进一步排查问题。
- 单机时,ModelArts Standard平台使用如下命令启动训练作业的“启动文件”。
- 启动方式三:使用msrun命令启动训练作业
当环境变量“MA_RUN_METHOD=msrun”时,表示ModelArts Standard平台使用msrun命令启动训练作业的“启动文件”。
要求MindSpore版本大于等于2.3.0。
该方案支持动态组网和基于rank table file文件组网两种方式。当配置了环境变量MS_RANKTABLE_ENABLE="True",则msrun会读取rank table file文件内容进行组网。否则默认使用动态组网。
msrun使用如下命令启动训练作业的“启动文件”。
msrun --worker_num=${msrun_worker_num} --local_worker_num=${MA_NUM_GPUS} --master_addr=${msrun_master_addr} --node_rank=${VC_TASK_INDEX} --master_port=${msrun_master_port} --log_dir=${msrun_log_dir} --join=True --cluster_time_out=${MSRUN_CLUSTER_TIME_OUT} --rank_table_file=${msrun_rank_table_file} "启动文件" {arg1} {arg2} ...
参数说明如下:
表3 msrun命令启动训练作业参数说明 参数名
参数解释
worker_num
所有进程个数。因为一个卡起一个进程,所以也表示使用总卡数。
local_worker_num
当前节点进程个数,即当前节点使用的卡数。
master_addr
msrun组网调度进程所在节点的IP地址,单机场景无需配置。
master_port
msrun组网调度进程的端口。
node_rank
msrun组网调度进程的端口。
log_dir
msrun组网和各个进程的日志输出地址。
join
训练进程拉起后,msrun进程是否仍存在,默认配置为“True”,等待所有进程退出后再退出。
cluster_time_out
集群组网超时时间,默认是“600s”,可通过环境变量“MSRUN_CLUSTER_TIME_OUT”控制。
rank_table_file
rank table file文件地址,如果配置了环境变量“MS_RANKTABLE_ENABLE="True"”,启动时会增加该参数。
PyTorch-GPU框架启动
单机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://<ip>:<port>" 参数。
多机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://<ip>:<port>" --rank <rank_id> --world_size <node_num>参数。
启动文件需要解析上述参数。
PyTorch-GPU框架的代码示例,请参见示例:创建DDP分布式训练(PyTorch+GPU)中的方式一。
TensorFlow-GPU框架启动
单机场景下(即选择的实例数为1),ModelArts只会在一个节点上启动一个训练容器,该训练容器独享节点规格的可使用资源。
多机场景下(即选择的实例数大于1),ModelArts会优先在相同节点上启动一个parameter server(以下简称ps)和一个worker,平台会自动一比一分配ps与worker任务。例如,双机场景会分配2个ps和2个worker任务,并为启动文件额外注入如下参数。
--task_index <VC_TASK_INDEX> --ps_hosts <TF_PS_HOSTS> --worker_hosts <TF_WORKER_HOSTS> --job_name <MA_TASK_NAME>
启动文件需要解析如下参数。
参数名 |
参数解释 |
---|---|
VC_TASK_INDEX |
task序号,如0、1、2。 |
TF_PS_HOSTS |
ps节点地址数组,如“[xx-ps-0.xx:TCP_PORT,xx-ps-1.xx:TCP_PORT]”,TCP_PORT是一个在5000~10000的随机端口。 |
TF_WORKER_HOSTS |
worker节点地址数组,如“[xx-worker-0.xx:TCP_PORT,xx-worker-1.xx:TCP_PORT]”,TCP_PORT是一个在5000~10000的随机端口。 |
MA_TASK_NAME |
任务名称,取值是ps或worker。 |
Horovod/MPI/MindSpore-GPU框架启动
使用Horovod/MPI/MindSpore-GPU预置框架来运行的启动文件,平台自动以mpirun命令启动训练作业。使用ModelArts Standard训练相应预置引擎,用户仅需关注启动文件(即训练脚本)的编写;mpirun命令和训练作业集群的构建都由平台自动完成。平台不会为启动文件额外拼接参数。
“pytorch_synthetic_benchmark.py”文件示例如下:
import argparse import torch.backends.cudnn as cudnn import torch.nn.functional as F import torch.optim as optim import torch.utils.data.distributed from torchvision import models import horovod.torch as hvd import timeit import numpy as np # Benchmark settings parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--fp16-allreduce', action='store_true', default=False, help='use fp16 compression during allreduce') parser.add_argument('--model', type=str, default='resnet50', help='model to benchmark') parser.add_argument('--batch-size', type=int, default=32, help='input batch size') parser.add_argument('--num-warmup-batches', type=int, default=10, help='number of warm-up batches that don\'t count towards benchmark') parser.add_argument('--num-batches-per-iter', type=int, default=10, help='number of batches per benchmark iteration') parser.add_argument('--num-iters', type=int, default=10, help='number of benchmark iterations') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--use-adasum', action='store_true', default=False, help='use adasum algorithm to do reduction') args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() hvd.init() if args.cuda: # Horovod: pin GPU to local rank. torch.cuda.set_device(hvd.local_rank()) cudnn.benchmark = True # Set up standard model. model = getattr(models, args.model)() # By default, Adasum doesn't need scaling up learning rate. lr_scaler = hvd.size() if not args.use_adasum else 1 if args.cuda: # Move model to GPU. model.cuda() # If using GPU Adasum allreduce, scale learning rate by local_size. if args.use_adasum and hvd.nccl_built(): lr_scaler = hvd.local_size() optimizer = optim.SGD(model.parameters(), lr=0.01 * lr_scaler) # Horovod: (optional) compression algorithm. compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none # Horovod: wrap optimizer with DistributedOptimizer. optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), compression=compression, op=hvd.Adasum if args.use_adasum else hvd.Average) # Horovod: broadcast parameters & optimizer state. hvd.broadcast_parameters(model.state_dict(), root_rank=0) hvd.broadcast_optimizer_state(optimizer, root_rank=0) # Set up fixed fake data data = torch.randn(args.batch_size, 3, 224, 224) target = torch.LongTensor(args.batch_size).random_() % 1000 if args.cuda: data, target = data.cuda(), target.cuda() def benchmark_step(): optimizer.zero_grad() output = model(data) loss = F.cross_entropy(output, target) loss.backward() optimizer.step() def log(s, nl=True): if hvd.rank() != 0: return print(s, end='\n' if nl else '') log('Model: %s' % args.model) log('Batch size: %d' % args.batch_size) device = 'GPU' if args.cuda else 'CPU' log('Number of %ss: %d' % (device, hvd.size())) # Warm-up log('Running warmup...') timeit.timeit(benchmark_step, number=args.num_warmup_batches) # Benchmark log('Running benchmark...') img_secs = [] for x in range(args.num_iters): time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter) img_sec = args.batch_size * args.num_batches_per_iter / time log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device)) img_secs.append(img_sec) # Results img_sec_mean = np.mean(img_secs) img_sec_conf = 1.96 * np.std(img_secs) log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf)) log('Total img/sec on %d %s(s): %.1f +-%.1f' % (hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))
run_mpi.sh文件内容如下:
#!/bin/bash MY_HOME=/home/ma-user MY_SSHD_PORT=${MY_SSHD_PORT:-"36666"} MY_MPI_BTL_TCP_IF=${MY_MPI_BTL_TCP_IF:-"eth0,bond0"} MY_TASK_INDEX=${MA_TASK_INDEX:-${VC_TASK_INDEX:-${VK_TASK_INDEX}}} MY_MPI_SLOTS=${MY_MPI_SLOTS:-"${MA_NUM_GPUS}"} MY_MPI_TUNE_FILE="${MY_HOME}/env_for_user_process" if [ -z ${MY_MPI_SLOTS} ]; then echo "[run_mpi] MY_MPI_SLOTS is empty, set it be 1" MY_MPI_SLOTS="1" fi printf "MY_HOME: ${MY_HOME}\nMY_SSHD_PORT: ${MY_SSHD_PORT}\nMY_MPI_BTL_TCP_IF: ${MY_MPI_BTL_TCP_IF}\nMY_TASK_INDEX: ${MY_TASK_INDEX}\nMY_MPI_SLOTS: ${MY_MPI_SLOTS}\n" env | grep -E '^MA_|SHARED_|^S3_|^PATH|^VC_WORKER_|^SCC|^CRED' | grep -v '=$' > ${MY_MPI_TUNE_FILE} # add -x to each line sed -i 's/^/-x /' ${MY_MPI_TUNE_FILE} sed -i "s|{{MY_SSHD_PORT}}|${MY_SSHD_PORT}|g" ${MY_HOME}/etc/ssh/sshd_config # start sshd service bash -c "$(which sshd) -f ${MY_HOME}/etc/ssh/sshd_config" # confirm the sshd is up netstat -anp | grep LIS | grep ${MY_SSHD_PORT} if [ $MY_TASK_INDEX -eq 0 ]; then # generate the hostfile of mpi for ((i=0; i<$MA_NUM_HOSTS; i++)) do eval hostname=${MA_VJ_NAME}-${MA_TASK_NAME}-${i}.${MA_VJ_NAME} echo "[run_mpi] hostname: ${hostname}" ip="" while [ -z "$ip" ]; do ip=$(ping -c 1 ${hostname} | grep "PING" | sed -E 's/PING .* .([0-9.]+). .*/\1/g') sleep 1 done echo "[run_mpi] resolved ip: ${ip}" # test the sshd is up while : do if [ cat < /dev/null >/dev/tcp/${ip}/${MY_SSHD_PORT} ]; then break fi sleep 1 done echo "[run_mpi] the sshd of ip ${ip} is up" echo "${ip} slots=$MY_MPI_SLOTS" >> ${MY_HOME}/hostfile done printf "[run_mpi] hostfile:\n`cat ${MY_HOME}/hostfile`\n" fi RET_CODE=0 if [ $MY_TASK_INDEX -eq 0 ]; then echo "[run_mpi] start exec command time: "$(date +"%Y-%m-%d-%H:%M:%S") np=$(( ${MA_NUM_HOSTS} * ${MY_MPI_SLOTS} )) echo "[run_mpi] command: mpirun -np ${np} -hostfile ${MY_HOME}/hostfile -mca plm_rsh_args \"-p ${MY_SSHD_PORT}\" -tune ${MY_MPI_TUNE_FILE} ... $@" # execute mpirun at worker-0 # mpirun mpirun \ -np ${np} \ -hostfile ${MY_HOME}/hostfile \ -mca plm_rsh_args "-p ${MY_SSHD_PORT}" \ -tune ${MY_MPI_TUNE_FILE} \ -bind-to none -map-by slot \ -x NCCL_DEBUG=INFO -x NCCL_SOCKET_IFNAME=${MY_MPI_BTL_TCP_IF} -x NCCL_SOCKET_FAMILY=AF_INET \ -x HOROVOD_MPI_THREADS_DISABLE=1 \ -x LD_LIBRARY_PATH \ -mca pml ob1 -mca btl ^openib -mca plm_rsh_no_tree_spawn true \ "$@" RET_CODE=$? if [ $RET_CODE -ne 0 ]; then echo "[run_mpi] exec command failed, exited with $RET_CODE" else echo "[run_mpi] exec command successfully, exited with $RET_CODE" fi # stop 1...N worker by killing the sleep proc sed -i '1d' ${MY_HOME}/hostfile if [ `cat ${MY_HOME}/hostfile | wc -l` -ne 0 ]; then echo "[run_mpi] stop 1 to (N - 1) worker by killing the sleep proc" sed -i 's/${MY_MPI_SLOTS}/1/g' ${MY_HOME}/hostfile printf "[run_mpi] hostfile:\n`cat ${MY_HOME}/hostfile`\n" mpirun \ --hostfile ${MY_HOME}/hostfile \ --mca btl_tcp_if_include ${MY_MPI_BTL_TCP_IF} \ --mca plm_rsh_args "-p ${MY_SSHD_PORT}" \ -x PATH -x LD_LIBRARY_PATH \ pkill sleep \ > /dev/null 2>&1 fi echo "[run_mpi] exit time: "$(date +"%Y-%m-%d-%H:%M:%S") else echo "[run_mpi] the training log is in worker-0" sleep 365d echo "[run_mpi] exit time: "$(date +"%Y-%m-%d-%H:%M:%S") fi exit $RET_CODE
常见问题
- 在torchrun初始化分布式一致性协商阶段出现“RuntimeError:Socket Timeout”错误时,如何解决?
如果在torchrun初始化分布式一致性协商阶段出现RuntimeError:Socket Timeout错误时,可以通过增加如下环境变量再次创建训练作业以查看torchrun初始化阶段的详细信息,进一步排查问题。
- LOGLEVEL=INFO
- TORCH_CPP_LOG_LEVEL=INFO
- TORCH_DISTRIBUTED_DEBUG=DETAIL
出现RuntimeError: Socket Timeout错误,一般是因为不同任务执行torchrun命令的时机差距过大导致的。torchrun命令执行时机差距过大,又多是因为在torchrun命令被执行之前任务还有一些初始化动作,例如下载训练数据集、CKPT等。这些初始化动作执行耗时差距过大会直接导致出现Socket Timeout错误。所以遇到Socket Timeout问题时首先需要排查的是各个任务执行torchrun的时间点差距是否在合理范围内,如果时间点差距过大,需要优化执行torchrun命令之前的初始化动作,使其时间点差距在合理范围内。