Help Center/ ModelArts/ ModelArts User Guide (Standard)/ Model Training/ Distributed Model Training/ Creating a Multiple-Node Multi-Card Distributed Training Job (DistributedDataParallel)
Updated on 2024-10-29 GMT+08:00

Creating a Multiple-Node Multi-Card Distributed Training Job (DistributedDataParallel)

To perform multi-node multi-card parallel training with PyTorch, follow the steps in this section and refer to the code example below. This section also includes a complete code sample for distributed parallel training on the CIFAR-10 dataset with ResNet18 for classification tasks.

Training Process

Compared with DataParallel, DistributedDataParallel can start multiple processes for computing, greatly improving compute resource usage. Based on torch.distributed, DistributedDataParallel has obvious advantages over DataParallel in the distributed computing case. The process is as follows:

  1. Initializes the process group.
  2. Creates a distributed parallel model. Each process has the same model and parameters.
  3. Creates a distributed sampler for data distribution to enable each process to load a unique subset of the original dataset in a mini batch.
  4. Parameters are organized into buckets based on their shapes or sizes, which are generally determined by each layer of the network that requires parameter update in a neural network model.
  5. Each process does its own forward propagation and computes its gradient.
  6. After all parameter gradients at a bucket are obtained, communication is performed for gradient averaging.
  7. Each GPU updates model parameters.

The detailed flowchart is as follows.

Figure 1 Multi-node multi-card parallel training

Code Modifications

  • Multi-process startup
  • New variables such as rank ID and world_size are used along with the TCP protocol.
  • Sampler for data distribution to avoid duplicate data between different processes
  • Model distribution: DistributedDataParallel(model)
  • Model saved in GPU 0
import torch
class Net(torch.nn.Module):
	pass

model = Net().cuda()

### DistributedDataParallel Begin ###
model = torch.nn.parallel.DistributedDataParallel(Net().cuda())
### DistributedDataParallel End ###

Multi-Node Distributed Debugging Adaptation and Code Example

In DistributedDataParallel, each process loads a subset of the original dataset in a batch, and finally the gradients of all processes are averaged as the final gradient. Due to a large number of samples, a calculated gradient is more reliable, and a learning rate can be increased.

This section describes the code of single-node training and distributed parallel training for the classification job of ResNet18 on the CIFAR-10 dataset. Directly execute the code to perform multi-node distributed training with CPUs or GPUs; comment out the distributed training settings in the code to perform single-node single-card training.

The training code contains three input parameters: basic training parameters, distributed parameters, and data parameters. The distributed parameters are automatically input by the platform. custom_data indicates whether to use custom data for training. If this parameter is set to true, torch-based random data is used for training and validation.

CIFAR-10 dataset

In notebook instances, torchvision of the default version cannot be used to obtain datasets. Therefore, the sample code provides three training data loading methods.

Click CIFAR-10 python version on the download page to download the CIFAR-10 dataset.

  • Download the CIFAR-10 dataset using torchvision.
  • Download the CIFAR-10 dataset based on the URL and decompress the dataset in a specified directory. The sizes of the training set and test set are (50000, 3, 32, 32) and (10000, 3, 32, 32), respectively.
  • Use Torch to obtain a random dataset similar to CIFAR-10. The sizes of the training set and test set are (5000, 3, 32, 32) and (1000, 3, 32, 32), respectively. The labels are still of 10 types. Set custom_data to true, and the training task can be directly executed without loading data.

Training code

In the following code, those commented with ### Settings for distributed training ... ### are code modifications for multi-node distributed training.

Do not modify the sample code. After the data path is changed to your path, multi-node distributed training can be executed on ModelArts.

After the distributed code modifications are commented out, the single-node single-card training can be executed. For details about the complete code, see Code Example of Distributed Training.

  • Importing dependency packages
    import datetime
    import inspect
    import os
    import pickle
    import random
    
    import argparse
    import numpy as np
    import torch
    import torch.distributed as dist
    from torch import nn, optim
    from torch.utils.data import TensorDataset, DataLoader
    from torch.utils.data.distributed import DistributedSampler
    from sklearn.metrics import accuracy_score
  • Defining the method and random number for loading data (The code for loading data is not described here due to its large amount.)
    def setup_seed(seed):
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        np.random.seed(seed)
        random.seed(seed)
        torch.backends.cudnn.deterministic = True
    
    def get_data(path):
    	pass
  • Defining a network structure
    class Block(nn.Module):
    
        def __init__(self, in_channels, out_channels, stride=1):
            super().__init__()
            self.residual_function = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
            self.shortcut = nn.Sequential()
            if stride != 1 or in_channels != out_channels:
                self.shortcut = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
    
        def forward(self, x):
            out = self.residual_function(x) + self.shortcut(x)
            return nn.ReLU(inplace=True)(out)
    
    
    class ResNet(nn.Module):
    
        def __init__(self, block, num_classes=10):
            super().__init__()
            self.conv1 = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace=True))
            self.conv2 = self.make_layer(block, 64, 64, 2, 1)
            self.conv3 = self.make_layer(block, 64, 128, 2, 2)
            self.conv4 = self.make_layer(block, 128, 256, 2, 2)
            self.conv5 = self.make_layer(block, 256, 512, 2, 2)
            self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
            self.dense_layer = nn.Linear(512, num_classes)
    
        def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
            strides = [stride] + [1] * (num_blocks - 1)
            layers = []
            for stride in strides:
                layers.append(block(in_channels, out_channels, stride))
                in_channels = out_channels
            return nn.Sequential(*layers)
    
        def forward(self, x):
            out = self.conv1(x)
            out = self.conv2(out)
            out = self.conv3(out)
            out = self.conv4(out)
            out = self.conv5(out)
            out = self.avg_pool(out)
            out = out.view(out.size(0), -1)
            out = self.dense_layer(out)
            return out
  • Training and validation
    def main():
        file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)
    
        seed = datetime.datetime.now().year
        setup_seed(seed)
    
        parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        parser.add_argument('--enable_gpu', default='true')
        parser.add_argument('--lr', default='0.01', help='learning rate')
        parser.add_argument('--epochs', default='100', help='training iteration')
    
        parser.add_argument('--init_method', default=None, help='tcp_port')
        parser.add_argument('--rank', type=int, default=0, help='index of current task')
        parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')
    
        parser.add_argument('--custom_data', default='false')
        parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
        parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
        args, unknown = parser.parse_known_args()
    
        args.enable_gpu = args.enable_gpu == 'true'
        args.custom_data = args.custom_data == 'true'
        args.lr = float(args.lr)
        args.epochs = int(args.epochs)
    
        if args.custom_data:
            print('[warning] you are training on custom random dataset, '
                  'validation accuracy may range from 0.4 to 0.6.')
    
        ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###
        dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
        ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###
    
        tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)
    
        batch_per_gpu = 128
        gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
        batch = batch_per_gpu * gpus_per_node
    
        tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)
    
        ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###
        tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
        tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
        ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###
    
        val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)
    
        lr = args.lr * gpus_per_node
        max_epoch = args.epochs
        model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
    
        ### Settings for distributed training. Build a DistributedDataParallel model. ###
        model = nn.parallel.DistributedDataParallel(model)
        ### Settings for distributed training. Build a DistributedDataParallel model. ###
    
        optimizer = optim.Adam(model.parameters(), lr=lr)
        loss_func = torch.nn.CrossEntropyLoss()
    
        os.makedirs(args.output_dir, exist_ok=True)
    
        for epoch in range(1, max_epoch + 1):
            model.train()
            train_loss = 0
    
            ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###
            tr_sampler.set_epoch(epoch)
            ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###
    
            for step, (tr_x, tr_y) in enumerate(tr_loader):
                if args.enable_gpu:
                    tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
                out = model(tr_x)
                loss = loss_func(out, tr_y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))
    
            val_loss = 0
            pred_record = []
            real_record = []
            model.eval()
            with torch.no_grad():
                for step, (val_x, val_y) in enumerate(val_loader):
                    if args.enable_gpu:
                        val_x, val_y = val_x.cuda(), val_y.cuda()
                    out = model(val_x)
                    pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                    real_record += list(val_y.cpu().numpy())
                    val_loss += loss_func(out, val_y).item()
            val_accu = accuracy_score(real_record, pred_record)
            print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')
    
            if args.rank == 0:
                # save ckpt every epoch
                torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))
    
    
    if __name__ == '__main__':
        main()
  • Result comparison
    100-epoch cifar-10 dataset training is completed using two resource types respectively: single-node single-card and two-node 16-card. The training duration and test set accuracy are as follows.
    Table 1 Training result comparison

    Resource Type

    Single-Node Single-Card

    Two-Node 16-Card

    Duration

    60 minutes

    20 minutes

    Accuracy

    80+

    80+

Code Example of Distributed Training

The following provides a complete code sample of distributed parallel training for the classification task of ResNet18 on the CIFAR-10 dataset.

The content of the training boot file main.py is as follows (if you need to execute a single-node and single-card training job, delete the code for distributed reconstruction):

import datetime
import inspect
import os
import pickle
import random
import logging 

import argparse
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch import nn, optim
import torch.distributed as dist
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.distributed import DistributedSampler

file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)


def load_pickle_data(path):
    with open(path, 'rb') as file:
        data = pickle.load(file, encoding='bytes')
    return data


def _load_data(file_path):
    raw_data = load_pickle_data(file_path)
    labels = raw_data[b'labels']
    data = raw_data[b'data']
    filenames = raw_data[b'filenames']

    data = data.reshape(10000, 3, 32, 32) / 255
    return data, labels, filenames


def load_cifar_data(root_path):
    train_root_path = os.path.join(root_path, 'cifar-10-batches-py/data_batch_')
    train_data_record = []
    train_labels = []
    train_filenames = []
    for i in range(1, 6):
        train_file_path = train_root_path + str(i)
        data, labels, filenames = _load_data(train_file_path)
        train_data_record.append(data)
        train_labels += labels
        train_filenames += filenames
    train_data = np.concatenate(train_data_record, axis=0)
    train_labels = np.array(train_labels)

    val_file_path = os.path.join(root_path, 'cifar-10-batches-py/test_batch')
    val_data, val_labels, val_filenames = _load_data(val_file_path)
    val_labels = np.array(val_labels)

    tr_data = torch.from_numpy(train_data).float()
    tr_labels = torch.from_numpy(train_labels).long()
    val_data = torch.from_numpy(val_data).float()
    val_labels = torch.from_numpy(val_labels).long()
    return tr_data, tr_labels, val_data, val_labels


def get_data(root_path, custom_data=False):
    if custom_data:
        train_samples, test_samples, img_size = 5000, 1000, 32
        tr_label = [1] * int(train_samples / 2) + [0] * int(train_samples / 2)
        val_label = [1] * int(test_samples / 2) + [0] * int(test_samples / 2)
        random.seed(2021)
        random.shuffle(tr_label)
        random.shuffle(val_label)
        tr_data, tr_labels = torch.randn((train_samples, 3, img_size, img_size)).float(), torch.tensor(tr_label).long()
        val_data, val_labels = torch.randn((test_samples, 3, img_size, img_size)).float(), torch.tensor(
            val_label).long()
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    elif os.path.exists(os.path.join(root_path, 'cifar-10-batches-py')):
        tr_data, tr_labels, val_data, val_labels = load_cifar_data(root_path)
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    else:
        try:
            import torchvision
            from torchvision import transforms
            tr_set = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=True, transform=transforms)
            val_set = torchvision.datasets.CIFAR10(root='./data', train=False,
                                                   download=True, transform=transforms)
            return tr_set, val_set
        except Exception as e:
            raise Exception(
                f"{e}, you can download and unzip cifar-10 dataset manually, "
                "the data url is http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz")


class Block(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.residual_function(x) + self.shortcut(x)
        return nn.ReLU(inplace=True)(out)


class ResNet(nn.Module):

    def __init__(self, block, num_classes=10):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        self.conv2 = self.make_layer(block, 64, 64, 2, 1)
        self.conv3 = self.make_layer(block, 64, 128, 2, 2)
        self.conv4 = self.make_layer(block, 128, 256, 2, 2)
        self.conv5 = self.make_layer(block, 256, 512, 2, 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dense_layer = nn.Linear(512, num_classes)

    def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(in_channels, out_channels, stride))
            in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.conv5(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.dense_layer(out)
        return out


def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


def obs_transfer(src_path, dst_path):
    import moxing as mox
    mox.file.copy_parallel(src_path, dst_path)
    logging.info(f"end copy data from {src_path} to {dst_path}")


def main():
    seed = datetime.datetime.now().year
    setup_seed(seed)

    parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--enable_gpu', default='true')
    parser.add_argument('--lr', default='0.01', help='learning rate')
    parser.add_argument('--epochs', default='100', help='training iteration')

    parser.add_argument('--init_method', default=None, help='tcp_port')
    parser.add_argument('--rank', type=int, default=0, help='index of current task')
    parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')

    parser.add_argument('--custom_data', default='false')
    parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
    parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
    args, unknown = parser.parse_known_args()

    args.enable_gpu = args.enable_gpu == 'true'
    args.custom_data = args.custom_data == 'true'
    args.lr = float(args.lr)
    args.epochs = int(args.epochs)

    if args.custom_data:
        logging.warning('you are training on custom random dataset, '
              'validation accuracy may range from 0.4 to 0.6.')

    ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###
    dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
    ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###

    tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)

    batch_per_gpu = 128
    gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
    batch = batch_per_gpu * gpus_per_node

    tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)

    ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###
    tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
    tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
    ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###

    val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)

    lr = args.lr * gpus_per_node * args.world_size
    max_epoch = args.epochs
    model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)

    ### Settings for distributed training. Build a DistributedDataParallel model. ###
    model = nn.parallel.DistributedDataParallel(model)
    ### Settings for distributed training. Build a DistributedDataParallel model. ###

    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_func = torch.nn.CrossEntropyLoss()

    os.makedirs(args.output_dir, exist_ok=True)

    for epoch in range(1, max_epoch + 1):
        model.train()
        train_loss = 0

        ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###
        tr_sampler.set_epoch(epoch)
        ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###

        for step, (tr_x, tr_y) in enumerate(tr_loader):
            if args.enable_gpu:
                tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
            out = model(tr_x)
            loss = loss_func(out, tr_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))

        val_loss = 0
        pred_record = []
        real_record = []
        model.eval()
        with torch.no_grad():
            for step, (val_x, val_y) in enumerate(val_loader):
                if args.enable_gpu:
                    val_x, val_y = val_x.cuda(), val_y.cuda()
                out = model(val_x)
                pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                real_record += list(val_y.cpu().numpy())
                val_loss += loss_func(out, val_y).item()
        val_accu = accuracy_score(real_record, pred_record)
        print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')

        if args.rank == 0:
            # save ckpt every epoch
            torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))


if __name__ == '__main__':
    main()

FAQs

1. How Do I Use Different Datasets in the Sample Code?

  • To use the CIFAR-10 dataset in the preceding code, download and decompress the dataset and upload it to the OBS bucket. The file directory structure is as follows:
    DDP
    |--- main.py
    |--- input_dir
    |------ cifar-10-batches-py
    |-------- data_batch_1
    |-------- data_batch_2
    |-------- ...

    DDP is the code directory specified during training job creation, main.py is the preceding code example (the boot file specified during training job creation), and cifar-10-batches-py is the unzipped dataset folder (stored in input_dir).

  • To use user-defined random data, change the value of custom_data in the code example to true.
    parser.add_argument('--custom_data', default='true')

    Then, run main.py. The parameters for creating a training job are the same as those shown in the preceding figure.

2. Why Can I Leave the IP Address of the Master Node Blank for DDP?

The init method parameter in parser.add_argument('--init_method', default=None, help='tcp_port') contains the IP address and port number of the master node, which are automatically input by the platform.