文档首页> AI开发平台ModelArts> 模型训练> 分布式训练> 分布式调测适配及代码示例
更新时间:2023-11-13 GMT+08:00

分布式调测适配及代码示例

在DistributedDataParallel中,不同进程分别从原始数据中加载batch的数据,最终将各个进程的梯度进行平均作为最终梯度,由于样本量更大,因此计算出的梯度更加可靠,可以适当增大学习率。

以下对resnet18在cifar10数据集上的分类任务,给出了单机训练和分布式训练改造(DDP)的代码。直接执行代码为多节点分布式训练且支持CPU分布式和GPU分布式,将代码中的分布式改造点注释掉后即可进行单节点单卡训练。

训练代码中包涵三部分入参,分别为训练基础参数、分布式参数和数据相关参数。其中分布式参数由平台自动入参,无需自行定义。数据相关参数中的custom_data表示是否使用自定义数据进行训练,该参数为“true”时使用基于torch自定义的随机数据进行训练和验证。

数据集

cifar10数据集

在Notebook中,无法直接使用默认版本的torchvision获取数据集,因此示例代码中提供了三种训练数据加载方式。

cifar-10数据集下载链接,单击“CIFAR-10 python version”。

  • 尝试基于torchvision获取cifar10数据集。
  • 基于数据链接下载数据并解压,放置在指定目录下,训练集和测试集的大小分别为(50000,3,32,32)和(10000,3,32,32)。
  • 考虑到下载cifar10数据集较慢,基于torch生成类似cifar10的随机数据集,训练集和测试集的大小分别为(5000,3,32,32)和(1000,3,32,32),标签仍为10类,指定custom_data = 'true'后可直接进行训练任务,无需加载数据。

训练代码

以下代码中以“### 分布式改造,... ###”注释的代码即为多节点分布式训练需要适配的代码改造点。

不对示例代码进行任何修改,适配数据路径后即可在ModelArts上完成多节点分布式训练。

注释掉分布式代码改造点,即可完成单节点单卡训练。完整代码见分布式训练完整代码示例

  • 导入依赖包
    import datetime
    import inspect
    import os
    import pickle
    import random
    
    import argparse
    import numpy as np
    import torch
    import torch.distributed as dist
    from torch import nn, optim
    from torch.utils.data import TensorDataset, DataLoader
    from torch.utils.data.distributed import DistributedSampler
    from sklearn.metrics import accuracy_score
  • 定义加载数据的方法和随机数,由于加载数据部分代码较多,此处省略
    def setup_seed(seed):
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        np.random.seed(seed)
        random.seed(seed)
        torch.backends.cudnn.deterministic = True
    
    def get_data(path):
    	pass
  • 定义网络结构
    class Block(nn.Module):
    
        def __init__(self, in_channels, out_channels, stride=1):
            super().__init__()
            self.residual_function = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
            self.shortcut = nn.Sequential()
            if stride != 1 or in_channels != out_channels:
                self.shortcut = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
    
        def forward(self, x):
            out = self.residual_function(x) + self.shortcut(x)
            return nn.ReLU(inplace=True)(out)
    
    
    class ResNet(nn.Module):
    
        def __init__(self, block, num_classes=10):
            super().__init__()
            self.conv1 = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace=True))
            self.conv2 = self.make_layer(block, 64, 64, 2, 1)
            self.conv3 = self.make_layer(block, 64, 128, 2, 2)
            self.conv4 = self.make_layer(block, 128, 256, 2, 2)
            self.conv5 = self.make_layer(block, 256, 512, 2, 2)
            self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
            self.dense_layer = nn.Linear(512, num_classes)
    
        def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
            strides = [stride] + [1] * (num_blocks - 1)
            layers = []
            for stride in strides:
                layers.append(block(in_channels, out_channels, stride))
                in_channels = out_channels
            return nn.Sequential(*layers)
    
        def forward(self, x):
            out = self.conv1(x)
            out = self.conv2(out)
            out = self.conv3(out)
            out = self.conv4(out)
            out = self.conv5(out)
            out = self.avg_pool(out)
            out = out.view(out.size(0), -1)
            out = self.dense_layer(out)
            return out
  • 进行训练和验证
    def main():
        file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)
    
        seed = datetime.datetime.now().year
        setup_seed(seed)
    
        parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        parser.add_argument('--enable_gpu', default='true')
        parser.add_argument('--lr', default='0.01', help='learning rate')
        parser.add_argument('--epochs', default='100', help='training iteration')
    
        parser.add_argument('--init_method', default=None, help='tcp_port')
        parser.add_argument('--rank', type=int, default=0, help='index of current task')
        parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')
    
        parser.add_argument('--custom_data', default='false')
        parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
        parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
        args, unknown = parser.parse_known_args()
    
        args.enable_gpu = args.enable_gpu == 'true'
        args.custom_data = args.custom_data == 'true'
        args.lr = float(args.lr)
        args.epochs = int(args.epochs)
    
        if args.custom_data:
            print('[warning] you are training on custom random dataset, '
                  'validation accuracy may range from 0.4 to 0.6.')
    
        ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
        dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
        ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
    
        tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)
    
        batch_per_gpu = 128
        gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
        batch = batch_per_gpu * gpus_per_node
    
        tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)
    
        ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
        tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
        tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
        ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
    
        val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)
    
        lr = args.lr * gpus_per_node
        max_epoch = args.epochs
        model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
    
        ### 分布式改造,构建DDP分布式模型 ###
        model = nn.parallel.DistributedDataParallel(model)
        ### 分布式改造,构建DDP分布式模型 ###
    
        optimizer = optim.Adam(model.parameters(), lr=lr)
        loss_func = torch.nn.CrossEntropyLoss()
    
        os.makedirs(args.output_dir, exist_ok=True)
    
        for epoch in range(1, max_epoch + 1):
            model.train()
            train_loss = 0
    
            ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
            tr_sampler.set_epoch(epoch)
            ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
    
            for step, (tr_x, tr_y) in enumerate(tr_loader):
                if args.enable_gpu:
                    tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
                out = model(tr_x)
                loss = loss_func(out, tr_y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))
    
            val_loss = 0
            pred_record = []
            real_record = []
            model.eval()
            with torch.no_grad():
                for step, (val_x, val_y) in enumerate(val_loader):
                    if args.enable_gpu:
                        val_x, val_y = val_x.cuda(), val_y.cuda()
                    out = model(val_x)
                    pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                    real_record += list(val_y.cpu().numpy())
                    val_loss += loss_func(out, val_y).item()
            val_accu = accuracy_score(real_record, pred_record)
            print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')
    
            if args.rank == 0:
                # save ckpt every epoch
                torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))
    
    
    if __name__ == '__main__':
        main()
  • 结果对比
    分别以单机单卡和两节点16卡两种资源类型完成100epoch的cifar-10数据集训练,训练时长和测试集准确率如下。
    表1 训练结果对比

    资源类型

    单机单卡

    两节点16卡

    耗时

    60分钟

    20分钟

    准确率

    80+

    80+