更新时间:2024-08-16 GMT+08:00
分享

创建多机多卡的分布式训练(DistributedDataParallel)

本章节介绍基于PyTorch引擎的多机多卡数据并行训练。并提供了分布式训练调测具体的代码适配操作过程和代码示例。同时还针对Resnet18在cifar10数据集上的分类任务,给出了分布式训练改造(DDP)的完整代码示例,供用户学习参考。

训练流程简述

相比于DP,DDP能够启动多进程进行运算,从而大幅度提升计算资源的利用率。可以基于torch.distributed实现真正的分布式计算,具体的原理此处不再赘述。大致的流程如下:

  1. 初始化进程组。
  2. 创建分布式并行模型,每个进程都会有相同的模型和参数。
  3. 创建数据分发Sampler,使每个进程加载一个mini batch中不同部分的数据。
  4. 网络中相邻参数分桶,一般为神经网络模型中需要进行参数更新的每一层网络。
  5. 每个进程前向传播并各自计算梯度。
  6. 模型某一层的参数得到梯度后会马上进行通讯并进行梯度平均。
  7. 各GPU更新模型参数。

具体流程图如下:

图1 多机多卡数据并行训练

代码改造点

  • 引入多进程启动机制:初始化进程
  • 引入几个变量:tcp协议,rank进程序号,worldsize开启的进程数量
  • 分发数据:DataLoader中多了一个Sampler参数,避免不同进程数据重复
  • 模型分发:DistributedDataParallel(model)
  • 模型保存:在序号为0的进程下保存模型
import torch
class Net(torch.nn.Module):
	pass

model = Net().cuda()

### DistributedDataParallel Begin ###
model = torch.nn.parallel.DistributedDataParallel(Net().cuda())
### DistributedDataParallel End ###

多节点分布式调测适配及代码示例

在DistributedDataParallel中,不同进程分别从原始数据中加载batch的数据,最终将各个进程的梯度进行平均作为最终梯度,由于样本量更大,因此计算出的梯度更加可靠,可以适当增大学习率。

以下对resnet18在cifar10数据集上的分类任务,给出了单机训练和分布式训练改造(DDP)的代码。直接执行代码为多节点分布式训练且支持CPU分布式和GPU分布式,将代码中的分布式改造点注释掉后即可进行单节点单卡训练。

训练代码中包含三部分入参,分别为训练基础参数、分布式参数和数据相关参数。其中分布式参数由平台自动入参,无需自行定义。数据相关参数中的custom_data表示是否使用自定义数据进行训练,该参数为“true”时使用基于torch自定义的随机数据进行训练和验证。

cifar10数据集

在Notebook中,无法直接使用默认版本的torchvision获取数据集,因此示例代码中提供了三种训练数据加载方式。

cifar-10数据集下载链接,单击“CIFAR-10 python version”。

  • 尝试基于torchvision获取cifar10数据集。
  • 基于数据链接下载数据并解压,放置在指定目录下,训练集和测试集的大小分别为(50000,3,32,32)和(10000,3,32,32)。
  • 考虑到下载cifar10数据集较慢,基于torch生成类似cifar10的随机数据集,训练集和测试集的大小分别为(5000,3,32,32)和(1000,3,32,32),标签仍为10类,指定custom_data = 'true'后可直接进行训练任务,无需加载数据。

训练代码

以下代码中以“### 分布式改造,... ###”注释的代码即为多节点分布式训练需要适配的代码改造点。

不对示例代码进行任何修改,适配数据路径后即可在ModelArts上完成多节点分布式训练。

注释掉分布式代码改造点,即可完成单节点单卡训练。完整代码见分布式训练完整代码示例

  • 导入依赖包
    import datetime
    import inspect
    import os
    import pickle
    import random
    
    import argparse
    import numpy as np
    import torch
    import torch.distributed as dist
    from torch import nn, optim
    from torch.utils.data import TensorDataset, DataLoader
    from torch.utils.data.distributed import DistributedSampler
    from sklearn.metrics import accuracy_score
  • 定义加载数据的方法和随机数,由于加载数据部分代码较多,此处省略
    def setup_seed(seed):
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        np.random.seed(seed)
        random.seed(seed)
        torch.backends.cudnn.deterministic = True
    
    def get_data(path):
    	pass
  • 定义网络结构
    class Block(nn.Module):
    
        def __init__(self, in_channels, out_channels, stride=1):
            super().__init__()
            self.residual_function = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
            self.shortcut = nn.Sequential()
            if stride != 1 or in_channels != out_channels:
                self.shortcut = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
    
        def forward(self, x):
            out = self.residual_function(x) + self.shortcut(x)
            return nn.ReLU(inplace=True)(out)
    
    
    class ResNet(nn.Module):
    
        def __init__(self, block, num_classes=10):
            super().__init__()
            self.conv1 = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace=True))
            self.conv2 = self.make_layer(block, 64, 64, 2, 1)
            self.conv3 = self.make_layer(block, 64, 128, 2, 2)
            self.conv4 = self.make_layer(block, 128, 256, 2, 2)
            self.conv5 = self.make_layer(block, 256, 512, 2, 2)
            self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
            self.dense_layer = nn.Linear(512, num_classes)
    
        def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
            strides = [stride] + [1] * (num_blocks - 1)
            layers = []
            for stride in strides:
                layers.append(block(in_channels, out_channels, stride))
                in_channels = out_channels
            return nn.Sequential(*layers)
    
        def forward(self, x):
            out = self.conv1(x)
            out = self.conv2(out)
            out = self.conv3(out)
            out = self.conv4(out)
            out = self.conv5(out)
            out = self.avg_pool(out)
            out = out.view(out.size(0), -1)
            out = self.dense_layer(out)
            return out
  • 进行训练和验证
    def main():
        file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)
    
        seed = datetime.datetime.now().year
        setup_seed(seed)
    
        parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        parser.add_argument('--enable_gpu', default='true')
        parser.add_argument('--lr', default='0.01', help='learning rate')
        parser.add_argument('--epochs', default='100', help='training iteration')
    
        parser.add_argument('--init_method', default=None, help='tcp_port')
        parser.add_argument('--rank', type=int, default=0, help='index of current task')
        parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')
    
        parser.add_argument('--custom_data', default='false')
        parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
        parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
        args, unknown = parser.parse_known_args()
    
        args.enable_gpu = args.enable_gpu == 'true'
        args.custom_data = args.custom_data == 'true'
        args.lr = float(args.lr)
        args.epochs = int(args.epochs)
    
        if args.custom_data:
            print('[warning] you are training on custom random dataset, '
                  'validation accuracy may range from 0.4 to 0.6.')
    
        ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
        dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
        ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
    
        tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)
    
        batch_per_gpu = 128
        gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
        batch = batch_per_gpu * gpus_per_node
    
        tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)
    
        ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
        tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
        tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
        ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
    
        val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)
    
        lr = args.lr * gpus_per_node
        max_epoch = args.epochs
        model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
    
        ### 分布式改造,构建DDP分布式模型 ###
        model = nn.parallel.DistributedDataParallel(model)
        ### 分布式改造,构建DDP分布式模型 ###
    
        optimizer = optim.Adam(model.parameters(), lr=lr)
        loss_func = torch.nn.CrossEntropyLoss()
    
        os.makedirs(args.output_dir, exist_ok=True)
    
        for epoch in range(1, max_epoch + 1):
            model.train()
            train_loss = 0
    
            ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
            tr_sampler.set_epoch(epoch)
            ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
    
            for step, (tr_x, tr_y) in enumerate(tr_loader):
                if args.enable_gpu:
                    tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
                out = model(tr_x)
                loss = loss_func(out, tr_y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))
    
            val_loss = 0
            pred_record = []
            real_record = []
            model.eval()
            with torch.no_grad():
                for step, (val_x, val_y) in enumerate(val_loader):
                    if args.enable_gpu:
                        val_x, val_y = val_x.cuda(), val_y.cuda()
                    out = model(val_x)
                    pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                    real_record += list(val_y.cpu().numpy())
                    val_loss += loss_func(out, val_y).item()
            val_accu = accuracy_score(real_record, pred_record)
            print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')
    
            if args.rank == 0:
                # save ckpt every epoch
                torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))
    
    
    if __name__ == '__main__':
        main()
  • 结果对比
    分别以单机单卡和两节点16卡两种资源类型完成100epoch的cifar-10数据集训练,训练时长和测试集准确率如下。
    表1 训练结果对比

    资源类型

    单机单卡

    两节点16卡

    耗时

    60分钟

    20分钟

    准确率

    80+

    80+

分布式训练完整代码示例

以下对resnet18在cifar10数据集上的分类任务,给出了分布式训练改造(DDP)的完整代码示例。

训练启动文件main.py内容如下(如果需要执行单机单卡训练任务,则将分布式改造的代码删除):

import datetime
import inspect
import os
import pickle
import random
import logging 

import argparse
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch import nn, optim
import torch.distributed as dist
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.distributed import DistributedSampler

file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)


def load_pickle_data(path):
    with open(path, 'rb') as file:
        data = pickle.load(file, encoding='bytes')
    return data


def _load_data(file_path):
    raw_data = load_pickle_data(file_path)
    labels = raw_data[b'labels']
    data = raw_data[b'data']
    filenames = raw_data[b'filenames']

    data = data.reshape(10000, 3, 32, 32) / 255
    return data, labels, filenames


def load_cifar_data(root_path):
    train_root_path = os.path.join(root_path, 'cifar-10-batches-py/data_batch_')
    train_data_record = []
    train_labels = []
    train_filenames = []
    for i in range(1, 6):
        train_file_path = train_root_path + str(i)
        data, labels, filenames = _load_data(train_file_path)
        train_data_record.append(data)
        train_labels += labels
        train_filenames += filenames
    train_data = np.concatenate(train_data_record, axis=0)
    train_labels = np.array(train_labels)

    val_file_path = os.path.join(root_path, 'cifar-10-batches-py/test_batch')
    val_data, val_labels, val_filenames = _load_data(val_file_path)
    val_labels = np.array(val_labels)

    tr_data = torch.from_numpy(train_data).float()
    tr_labels = torch.from_numpy(train_labels).long()
    val_data = torch.from_numpy(val_data).float()
    val_labels = torch.from_numpy(val_labels).long()
    return tr_data, tr_labels, val_data, val_labels


def get_data(root_path, custom_data=False):
    if custom_data:
        train_samples, test_samples, img_size = 5000, 1000, 32
        tr_label = [1] * int(train_samples / 2) + [0] * int(train_samples / 2)
        val_label = [1] * int(test_samples / 2) + [0] * int(test_samples / 2)
        random.seed(2021)
        random.shuffle(tr_label)
        random.shuffle(val_label)
        tr_data, tr_labels = torch.randn((train_samples, 3, img_size, img_size)).float(), torch.tensor(tr_label).long()
        val_data, val_labels = torch.randn((test_samples, 3, img_size, img_size)).float(), torch.tensor(
            val_label).long()
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    elif os.path.exists(os.path.join(root_path, 'cifar-10-batches-py')):
        tr_data, tr_labels, val_data, val_labels = load_cifar_data(root_path)
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    else:
        try:
            import torchvision
            from torchvision import transforms
            tr_set = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=True, transform=transforms)
            val_set = torchvision.datasets.CIFAR10(root='./data', train=False,
                                                   download=True, transform=transforms)
            return tr_set, val_set
        except Exception as e:
            raise Exception(
                f"{e}, you can download and unzip cifar-10 dataset manually, "
                "the data url is http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz")


class Block(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.residual_function(x) + self.shortcut(x)
        return nn.ReLU(inplace=True)(out)


class ResNet(nn.Module):

    def __init__(self, block, num_classes=10):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        self.conv2 = self.make_layer(block, 64, 64, 2, 1)
        self.conv3 = self.make_layer(block, 64, 128, 2, 2)
        self.conv4 = self.make_layer(block, 128, 256, 2, 2)
        self.conv5 = self.make_layer(block, 256, 512, 2, 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dense_layer = nn.Linear(512, num_classes)

    def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(in_channels, out_channels, stride))
            in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.conv5(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.dense_layer(out)
        return out


def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


def obs_transfer(src_path, dst_path):
    import moxing as mox
    mox.file.copy_parallel(src_path, dst_path)
    logging.info(f"end copy data from {src_path} to {dst_path}")


def main():
    seed = datetime.datetime.now().year
    setup_seed(seed)

    parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--enable_gpu', default='true')
    parser.add_argument('--lr', default='0.01', help='learning rate')
    parser.add_argument('--epochs', default='100', help='training iteration')

    parser.add_argument('--init_method', default=None, help='tcp_port')
    parser.add_argument('--rank', type=int, default=0, help='index of current task')
    parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')

    parser.add_argument('--custom_data', default='false')
    parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
    parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
    args, unknown = parser.parse_known_args()

    args.enable_gpu = args.enable_gpu == 'true'
    args.custom_data = args.custom_data == 'true'
    args.lr = float(args.lr)
    args.epochs = int(args.epochs)

    if args.custom_data:
        logging.warning('you are training on custom random dataset, '
              'validation accuracy may range from 0.4 to 0.6.')

    ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
    dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
    ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###

    tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)

    batch_per_gpu = 128
    gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
    batch = batch_per_gpu * gpus_per_node

    tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)

    ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
    tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
    tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
    ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###

    val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)

    lr = args.lr * gpus_per_node * args.world_size
    max_epoch = args.epochs
    model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)

    ### 分布式改造,构建DDP分布式模型 ###
    model = nn.parallel.DistributedDataParallel(model)
    ### 分布式改造,构建DDP分布式模型 ###

    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_func = torch.nn.CrossEntropyLoss()

    os.makedirs(args.output_dir, exist_ok=True)

    for epoch in range(1, max_epoch + 1):
        model.train()
        train_loss = 0

        ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
        tr_sampler.set_epoch(epoch)
        ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###

        for step, (tr_x, tr_y) in enumerate(tr_loader):
            if args.enable_gpu:
                tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
            out = model(tr_x)
            loss = loss_func(out, tr_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))

        val_loss = 0
        pred_record = []
        real_record = []
        model.eval()
        with torch.no_grad():
            for step, (val_x, val_y) in enumerate(val_loader):
                if args.enable_gpu:
                    val_x, val_y = val_x.cuda(), val_y.cuda()
                out = model(val_x)
                pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                real_record += list(val_y.cpu().numpy())
                val_loss += loss_func(out, val_y).item()
        val_accu = accuracy_score(real_record, pred_record)
        print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')

        if args.rank == 0:
            # save ckpt every epoch
            torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))


if __name__ == '__main__':
    main()

常见问题

1、示例代码中如何使用不同的数据集?

  • 上述代码如果使用cifar10数据集,则将数据集下载并解压后,上传至OBS桶中,文件目录结构如下:
    DDP
    |--- main.py
    |--- input_dir
    |------ cifar-10-batches-py
    |-------- data_batch_1
    |-------- data_batch_2
    |-------- ...

    其中“DDP”为创建训练作业时的“代码目录”“main.py”为上文代码示例(即创建训练作业时的“启动文件”),“cifar-10-batches-py”为解压后的数据集文件夹(放在input_dir文件夹下)。

  • 如果使用自定义的随机数据,则将代码示例中的参数“custom_data”改为“true”,修改后内容如下:
    parser.add_argument('--custom_data', default='true')

    然后直接运行代码示例“main.py”即可,创建训练作业的参数与上图相同。

2、为什么DDP可以不输入主节点ip?

“parser.add_argument('--init_method', default=None, help='tcp_port')”中的init method参数值会包含主节点的ip和端口,由平台自动入参,不需要用户输入主节点的ip和端口。

相关文档