示例:创建DDP分布式训练(PyTorch+GPU)
在分布式训练场景中,使用PyTorch的DistributedDataParallel(DDP)功能是实现高效训练的重要方式。为了帮助用户更好地理解和应用这一功能,本文将详细介绍三种通过训练作业启动PyTorchDDP训练的方法,并提供对应的代码示例。
- 使用PyTorch预置框架功能,通过mp.spawn命令启动
 - 使用自定义镜像功能
   
- 通过torch.distributed.launch命令启动
 - 通过torch.distributed.run命令启动
 
这些方法为用户提供了灵活的选择,能够根据实际需求和环境配置进行适配。
通过本文的介绍,用户可以根据自身场景选择合适的启动方式,快速上手PyTorch的分布式训练功能。
 
创建训练作业
- 方式一:使用PyTorch预置框架功能,通过mp.spawn命令启动训练作业。
    
创建训练作业的关键参数如表1所示。
表1 创建训练作业(预置框架) 参数名称
说明
创建方式
选择“自定义算法”。
启动方式
选择“预置框架”,引擎选择“PyTorch”,PyTorch版本根据训练要求选择。
代码目录
选择OBS桶中训练code文件夹所在路径,例如“obs://test-modelarts/code/”。
启动文件
选择代码目录中训练作业的Python启动脚本。例如“obs://test-modelarts/code/main.py”。
超参
当资源规格为单机多卡时,需要指定超参world_size和rank。
当资源规格为多机时(即实例数大于 1),无需设置超参world_size和rank,超参会由平台自动注入。
 - 方式二:使用自定义镜像功能,通过torch.distributed.launch命令启动训练作业。
    
创建训练作业的关键参数如表2所示。
 - 方式三:使用自定义镜像功能,通过torch.distributed.run命令启动训练作业。
    
创建训练作业的关键参数如表3所示。
 
代码示例
文件目录结构如下所示,将以下文件上传至OBS桶中:
code # 代码根目录 └─torch_ddp.py # PyTorch DDP训练代码文件 └─main.py # 使用PyTorch预置框架功能,通过mp.spawn命令启动训练的启动文件 └─torchlaunch.sh # 使用自定义镜像功能,通过torch.distributed.launch命令启动训练的启动文件 └─torchrun.sh # 使用自定义镜像功能,通过torch.distributed.run命令启动训练的启动文件
torch_ddp.py内容如下:
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
# 用于通过 mp.spawn 启动
def init_from_arg(local_rank, base_rank, world_size, init_method):
    rank = base_rank + local_rank
    dist.init_process_group("nccl", rank=rank, init_method=init_method, world_size=world_size)
    ddp_train(local_rank)
# 用于通过 torch.distributed.launch 或 torch.distributed.run 启动
def init_from_env():
    dist.init_process_group(backend='nccl', init_method='env://')
    local_rank=int(os.environ["LOCAL_RANK"])
    ddp_train(local_rank)
def cleanup():
    dist.destroy_process_group()
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)
    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))
def ddp_train(device_id):
    # create model and move it to GPU with id rank
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    cleanup()
if __name__ == "__main__":
    init_from_env()
  main.py内容如下:
import argparse
import torch
import torch.multiprocessing as mp
parser = argparse.ArgumentParser(description='ddp demo args')
parser.add_argument('--world_size', type=int, required=True)
parser.add_argument('--rank', type=int, required=True)
parser.add_argument('--init_method', type=str, required=True)
args, unknown = parser.parse_known_args()
if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    world_size = n_gpus * args.world_size
    base_rank = n_gpus * args.rank
    # 调用 DDP 示例代码中的启动函数
    from torch_ddp import init_from_arg
    mp.spawn(init_from_arg,
        args=(base_rank, world_size, args.init_method),
        nprocs=n_gpus,
        join=True)
  #!/bin/bash
# 系统默认环境变量,不建议修改
MASTER_HOST="$VC_WORKER_HOSTS"
MASTER_ADDR="${VC_WORKER_HOSTS%%,*}"
MASTER_PORT="6060"
JOB_ID="1234"
NNODES="$MA_NUM_HOSTS"
NODE_RANK="$VC_TASK_INDEX"
NGPUS_PER_NODE="$MA_NUM_GPUS"
# 自定义环境变量,指定python脚本和参数
PYTHON_SCRIPT=${MA_JOB_DIR}/code/torch_ddp.py
PYTHON_ARGS=""
CMD="python -m torch.distributed.launch \
    --nnodes=$NNODES \
    --node_rank=$NODE_RANK \
    --nproc_per_node=$NGPUS_PER_NODE \
    --master_addr $MASTER_ADDR \
    --master_port=$MASTER_PORT \
    --use_env \
    $PYTHON_SCRIPT \
    $PYTHON_ARGS
"
echo $CMD
$CMD
  torchrun.sh内容如下:
 
   PyTorch 2.1版本需要将“rdzv_backend”参数设置为“static:--rdzv_backend=static”。
#!/bin/bash
# 系统默认环境变量,不建议修改
MASTER_HOST="$VC_WORKER_HOSTS"
MASTER_ADDR="${VC_WORKER_HOSTS%%,*}"
MASTER_PORT="6060"
JOB_ID="1234"
NNODES="$MA_NUM_HOSTS"
NODE_RANK="$VC_TASK_INDEX"
NGPUS_PER_NODE="$MA_NUM_GPUS"
# 自定义环境变量,指定python脚本和参数
PYTHON_SCRIPT=${MA_JOB_DIR}/code/torch_ddp.py
PYTHON_ARGS=""
if [[ $NODE_RANK == 0 ]]; then
    EXT_ARGS="--rdzv_conf=is_host=1"
else
    EXT_ARGS=""
fi
CMD="python -m torch.distributed.run \
    --nnodes=$NNODES \
    --node_rank=$NODE_RANK \
    $EXT_ARGS \
    --nproc_per_node=$NGPUS_PER_NODE \
    --rdzv_id=$JOB_ID \
    --rdzv_backend=c10d \
    --rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT \
    $PYTHON_SCRIPT \
    $PYTHON_ARGS
    "
echo $CMD
$CMD