更新时间:2024-10-24 GMT+08:00

预置框架启动文件的启动流程说明

ModelArts Standard训练服务预置了多种AI框架,并对不同的框架提供了针对性适配,用户在使用这些预置框架进行模型训练时,训练的启动命令也需要做相应适配。

本章节详细介绍基于不同的预置框架创建训练作业时,如何修改训练的启动文件。

Ascend-Powered-Engine框架启动原理

在ModelArts算法建界面选择AI引擎时,能够看到一个叫做Ascend-Powered-Engine的AI引擎,它与其他AI引擎相比有些特别。它既不是一个AI框架(如:PyTorch、TensorFlow)也不是一个并行执行框架(如:MPI),而是适配加速芯片Ascend编译的一组AI框架+运行环境+启动方式的集合。

  • 由于几乎所有的Ascend加速卡都跑在ARM规格的机器上,因此上层docker镜像也都是ARM镜像。
  • 针对GPU场景的镜像中安装了对应版本的CUDA(由英伟达推出的统一计算架构)驱动,而Ascend-Powered-Engine引擎的镜像中都安装了与底层硬件版本适配的CANN(华为针对AI场景推出的异构计算架构)驱动。

提交训练作业后,ModelArts Standard平台会自动运行训练任务的启动文件;启动文件的运行次数取决于训练卡数。

单机作业时,每个任务内运行N次启动文件;N为任务的卡数;例如单机1卡,则worker-0任务的启动文件会被运行1次;单机8卡,worker-0任务的启动文件会被运行8次。因此需要避免在启动文件中进行端口监听。

启动文件会被设置如下环境变量:

  • RANK_TABLE_FILE:rank table file (RTF) 文件路径
  • ASCEND_DEVICE_ID:逻辑device_id,例如单卡训练,该值始终为 0
  • RANK_ID:可以理解为训练作业级的device逻辑(顺序)编号
  • RANK_SIZE:根据RTF中device的数目设置该值,例如4 * snt9b,该值即为4。

当需要启动文件仍然在逻辑上仅运行1次时,则可以在启动文件中判断“ASCEND_DEVICE_ID”的值,当值为“0”则执行逻辑,当值为非0则直接退出。

Ascend-Powered-Engine框架对应的代码示例mindspore-verification.py,请参见训练脚本mindspore-verification.py文件

Ascend-Powered-Engine框架单机启动命令和分布式启动命令无区别。

PyTorch-GPU框架启动原理

单机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://<ip>:<port>" 参数。

多机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://<ip>:<port>" --rank <rank_id> --world_size <node_num>参数。

启动文件需要解析上述参数。

PyTorch-GPU框架的代码示例,请参见示例:创建DDP分布式训练(PyTorch+GPU)(方式一)

TensorFlow-GPU框架启动原理

单机场景下(即选择的节点数为1),ModelArts只会在一个节点上启动一个训练容器,该训练容器独享节点规格的可使用资源。

多机场景下(即选择的节点数大于1),ModelArts会优先在相同节点上启动一个parameter server(以下简称ps)和一个worker,平台会自动1:1分配ps与worker任务,例如2机场景,则会分配2ps与2 worker任务;并为启动文件额外注入--task_index <VC_TASK_INDEX> --ps_hosts <TF_PS_HOSTS> --worker_hosts <TF_WORKER_HOSTS> --job_name <MA_TASK_NAME> 参数。

启动文件需要解析如下参数。

  • VC_TASK_INDEX:task序号,如0/1/2。
  • TF_PS_HOSTS :ps节点地址数组,如[xx-ps-0.xx:TCP_PORT,xx-ps-1.xx:TCP_PORT],TCP_PORT是一个在5000~10000的随机端口。
  • TF_WORKER_HOSTS:worker节点地址数组,如[xx-worker-0.xx:TCP_PORT,xx-worker-1.xx:TCP_PORT],TCP_PORT是一个在5000~10000的随机端口。
  • MA_TASK_NAME:任务名称,取值是ps或worker。

具体示例请参见:TensorFlow-GPU框架的代码示例mnist.py(单机)

Horovod/MPI/MindSpore-GPU

使用Horovod/MPI/MindSpore-GPU预置框架来运行的启动文件,平台自动以mpirun命令启动之。使用ModelArts Standard训练相应预置引擎,用户仅需关注启动文件(即训练脚本)的编写;mpirun命令和训练作业集群的构建都由平台自动完成。平台不会为启动文件额外拼接参数。

“pytorch_synthetic_benchmark.py”文件示例如下:

import argparse
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import models
import horovod.torch as hvd
import timeit
import numpy as np

# Benchmark settings
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark',
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')

parser.add_argument('--model', type=str, default='resnet50',
                    help='model to benchmark')
parser.add_argument('--batch-size', type=int, default=32,
                    help='input batch size')

parser.add_argument('--num-warmup-batches', type=int, default=10,
                    help='number of warm-up batches that don\'t count towards benchmark')
parser.add_argument('--num-batches-per-iter', type=int, default=10,
                    help='number of batches per benchmark iteration')
parser.add_argument('--num-iters', type=int, default=10,
                    help='number of benchmark iterations')

parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')

parser.add_argument('--use-adasum', action='store_true', default=False,
                    help='use adasum algorithm to do reduction')

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()

hvd.init()

if args.cuda:
    # Horovod: pin GPU to local rank.
    torch.cuda.set_device(hvd.local_rank())

cudnn.benchmark = True

# Set up standard model.
model = getattr(models, args.model)()

# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1

if args.cuda:
    # Move model to GPU.
    model.cuda()
    # If using GPU Adasum allreduce, scale learning rate by local_size.
    if args.use_adasum and hvd.nccl_built():
        lr_scaler = hvd.local_size()

optimizer = optim.SGD(model.parameters(), lr=0.01 * lr_scaler)

# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
                                     named_parameters=model.named_parameters(),
                                     compression=compression,
                                     op=hvd.Adasum if args.use_adasum else hvd.Average)

# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# Set up fixed fake data
data = torch.randn(args.batch_size, 3, 224, 224)
target = torch.LongTensor(args.batch_size).random_() % 1000
if args.cuda:
    data, target = data.cuda(), target.cuda()


def benchmark_step():
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step()


def log(s, nl=True):
    if hvd.rank() != 0:
        return
    print(s, end='\n' if nl else '')


log('Model: %s' % args.model)
log('Batch size: %d' % args.batch_size)
device = 'GPU' if args.cuda else 'CPU'
log('Number of %ss: %d' % (device, hvd.size()))

# Warm-up
log('Running warmup...')
timeit.timeit(benchmark_step, number=args.num_warmup_batches)

# Benchmark
log('Running benchmark...')
img_secs = []
for x in range(args.num_iters):
    time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter)
    img_sec = args.batch_size * args.num_batches_per_iter / time
    log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
    img_secs.append(img_sec)

# Results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf))
log('Total img/sec on %d %s(s): %.1f +-%.1f' %
    (hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))

run_mpi.sh文件内容如下:

#!/bin/bash
MY_HOME=/home/ma-user

MY_SSHD_PORT=${MY_SSHD_PORT:-"36666"}

MY_MPI_BTL_TCP_IF=${MY_MPI_BTL_TCP_IF:-"eth0,bond0"}

MY_TASK_INDEX=${MA_TASK_INDEX:-${VC_TASK_INDEX:-${VK_TASK_INDEX}}}

MY_MPI_SLOTS=${MY_MPI_SLOTS:-"${MA_NUM_GPUS}"}

MY_MPI_TUNE_FILE="${MY_HOME}/env_for_user_process"

if [ -z ${MY_MPI_SLOTS} ]; then
    echo "[run_mpi] MY_MPI_SLOTS is empty, set it be 1"
    MY_MPI_SLOTS="1"
fi

printf "MY_HOME: ${MY_HOME}\nMY_SSHD_PORT: ${MY_SSHD_PORT}\nMY_MPI_BTL_TCP_IF: ${MY_MPI_BTL_TCP_IF}\nMY_TASK_INDEX: ${MY_TASK_INDEX}\nMY_MPI_SLOTS: ${MY_MPI_SLOTS}\n"

env | grep -E '^MA_|SHARED_|^S3_|^PATH|^VC_WORKER_|^SCC|^CRED' | grep -v '=$' > ${MY_MPI_TUNE_FILE}
# add -x to each line
sed -i 's/^/-x /' ${MY_MPI_TUNE_FILE}

sed -i "s|{{MY_SSHD_PORT}}|${MY_SSHD_PORT}|g" ${MY_HOME}/etc/ssh/sshd_config

# start sshd service
bash -c "$(which sshd) -f ${MY_HOME}/etc/ssh/sshd_config"

# confirm the sshd is up
netstat -anp | grep LIS | grep ${MY_SSHD_PORT}

if [ $MY_TASK_INDEX -eq 0 ]; then
    # generate the hostfile of mpi
    for ((i=0; i<$MA_NUM_HOSTS; i++))
    do
        eval hostname=${MA_VJ_NAME}-${MA_TASK_NAME}-${i}.${MA_VJ_NAME}
        echo "[run_mpi] hostname: ${hostname}"

        ip=""
        while [ -z "$ip" ]; do
            ip=$(ping -c 1 ${hostname} | grep "PING" | sed -E 's/PING .* .([0-9.]+). .*/\1/g')
            sleep 1
        done
        echo "[run_mpi] resolved ip: ${ip}"

        # test the sshd is up
        while :
        do
            if [ cat < /dev/null >/dev/tcp/${ip}/${MY_SSHD_PORT} ]; then
                break
            fi
            sleep 1
        done

        echo "[run_mpi] the sshd of ip ${ip} is up"

        echo "${ip} slots=$MY_MPI_SLOTS" >> ${MY_HOME}/hostfile
    done

    printf "[run_mpi] hostfile:\n`cat ${MY_HOME}/hostfile`\n"
fi

RET_CODE=0

if [ $MY_TASK_INDEX -eq 0 ]; then

    echo "[run_mpi] start exec command time: "$(date +"%Y-%m-%d-%H:%M:%S")

    np=$(( ${MA_NUM_HOSTS} * ${MY_MPI_SLOTS} ))

    echo "[run_mpi] command: mpirun -np ${np} -hostfile ${MY_HOME}/hostfile -mca plm_rsh_args \"-p ${MY_SSHD_PORT}\" -tune ${MY_MPI_TUNE_FILE} ... $@"

    # execute mpirun at worker-0
    # mpirun
    mpirun \
        -np ${np} \
        -hostfile ${MY_HOME}/hostfile \
        -mca plm_rsh_args "-p ${MY_SSHD_PORT}" \
        -tune ${MY_MPI_TUNE_FILE} \
        -bind-to none -map-by slot \
        -x NCCL_DEBUG=INFO -x NCCL_SOCKET_IFNAME=${MY_MPI_BTL_TCP_IF} -x NCCL_SOCKET_FAMILY=AF_INET \
        -x HOROVOD_MPI_THREADS_DISABLE=1 \
        -x LD_LIBRARY_PATH \
        -mca pml ob1 -mca btl ^openib -mca plm_rsh_no_tree_spawn true \
        "$@"

    RET_CODE=$?

    if [ $RET_CODE -ne 0 ]; then
        echo "[run_mpi] exec command failed, exited with $RET_CODE"
    else
        echo "[run_mpi] exec command successfully, exited with $RET_CODE"
    fi

    # stop 1...N worker by killing the sleep proc
    sed -i '1d' ${MY_HOME}/hostfile
    if [ `cat ${MY_HOME}/hostfile | wc -l` -ne 0 ]; then
        echo "[run_mpi] stop 1 to (N - 1) worker by killing the sleep proc"

        sed -i 's/${MY_MPI_SLOTS}/1/g' ${MY_HOME}/hostfile
        printf "[run_mpi] hostfile:\n`cat ${MY_HOME}/hostfile`\n"

        mpirun \
        --hostfile ${MY_HOME}/hostfile \
        --mca btl_tcp_if_include ${MY_MPI_BTL_TCP_IF} \
        --mca plm_rsh_args "-p ${MY_SSHD_PORT}" \
        -x PATH -x LD_LIBRARY_PATH \
        pkill sleep \
        > /dev/null 2>&1
    fi

    echo "[run_mpi] exit time: "$(date +"%Y-%m-%d-%H:%M:%S")
else
    echo "[run_mpi] the training log is in worker-0"
    sleep 365d
    echo "[run_mpi] exit time: "$(date +"%Y-%m-%d-%H:%M:%S")
fi

exit $RET_CODE