Distributed Debugging Adaptation and Code Example
In DistributedDataParallel, each process loads a subset of the original dataset in a batch, and finally the gradients of all processes are averaged as the final gradient. Due to a large number of samples, a calculated gradient is more reliable, and a learning rate can be increased.
This section describes the code of single-node training and distributed parallel training for the classification job of ResNet18 on the CIFAR-10 dataset. Directly execute the code to perform multi-node distributed training with CPUs or GPUs; comment out the distributed training settings in the code to perform single-node single-card training.
The training code contains three input parameters: basic training parameters, distributed parameters, and data parameters. The distributed parameters are automatically input by the platform. custom_data indicates whether to use custom data for training. If this parameter is set to true, torch-based random data is used for training and validation.
Dataset
CIFAR-10 dataset
In notebook instances, torchvision of the default version cannot be used to obtain datasets. Therefore, the sample code provides three training data loading methods.
Click CIFAR-10 python version on the download page to download the CIFAR-10 dataset.
- Download the CIFAR-10 dataset using torchvision.
- Download the CIFAR-10 dataset based on the URL and decompress the dataset in a specified directory. The sizes of the training set and test set are (50000, 3, 32, 32) and (10000, 3, 32, 32), respectively.
- Use Torch to obtain a random dataset similar to CIFAR-10. The sizes of the training set and test set are (5000, 3, 32, 32) and (1000, 3, 32, 32), respectively. The labels are still of 10 types. Set custom_data to true, and the training task can be directly executed without loading data.
Training Code
In the following code, those commented with ### Settings for distributed training and ... ### are code modifications for multi-node distributed training.
Do not modify the sample code. After the data path is changed to your path, multi-node distributed training can be executed on ModelArts.
After the distributed code modifications are commented out, the single-node single-card training can be executed. For details about the complete code, see Sample Code of Distributed Training.
- Importing dependency packages
import datetime import inspect import os import pickle import random import argparse import numpy as np import torch import torch.distributed as dist from torch import nn, optim from torch.utils.data import TensorDataset, DataLoader from torch.utils.data.distributed import DistributedSampler from sklearn.metrics import accuracy_score
- Defining the method and random number for loading data (The code for loading data is not described here due to its large amount.)
def setup_seed(seed): torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.deterministic = True def get_data(path): pass
- Defining a network structure
class Block(nn.Module): def __init__(self, in_channels, out_channels, stride=1): super().__init__() self.residual_function = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(out_channels) ) self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = self.residual_function(x) + self.shortcut(x) return nn.ReLU(inplace=True)(out) class ResNet(nn.Module): def __init__(self, block, num_classes=10): super().__init__() self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) self.conv2 = self.make_layer(block, 64, 64, 2, 1) self.conv3 = self.make_layer(block, 64, 128, 2, 2) self.conv4 = self.make_layer(block, 128, 256, 2, 2) self.conv5 = self.make_layer(block, 256, 512, 2, 2) self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.dense_layer = nn.Linear(512, num_classes) def make_layer(self, block, in_channels, out_channels, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(in_channels, out_channels, stride)) in_channels = out_channels return nn.Sequential(*layers) def forward(self, x): out = self.conv1(x) out = self.conv2(out) out = self.conv3(out) out = self.conv4(out) out = self.conv5(out) out = self.avg_pool(out) out = out.view(out.size(0), -1) out = self.dense_layer(out) return out
- Training and validation
def main(): file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename) seed = datetime.datetime.now().year setup_seed(seed) parser = argparse.ArgumentParser(description='Pytorch distribute training', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--enable_gpu', default='true') parser.add_argument('--lr', default='0.01', help='learning rate') parser.add_argument('--epochs', default='100', help='training iteration') parser.add_argument('--init_method', default=None, help='tcp_port') parser.add_argument('--rank', type=int, default=0, help='index of current task') parser.add_argument('--world_size', type=int, default=1, help='total number of tasks') parser.add_argument('--custom_data', default='false') parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir')) parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir')) args, unknown = parser.parse_known_args() args.enable_gpu = args.enable_gpu == 'true' args.custom_data = args.custom_data == 'true' args.lr = float(args.lr) args.epochs = int(args.epochs) if args.custom_data: print('[warning] you are training on custom random dataset, ' 'validation accuracy may range from 0.4 to 0.6.') ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ### dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank) ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ### tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data) batch_per_gpu = 128 gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1 batch = batch_per_gpu * gpus_per_node tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False) ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ### tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank) tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True) ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ### val_loader = DataLoader(val_set, batch_size=batch, shuffle=False) lr = args.lr * gpus_per_node max_epoch = args.epochs model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block) ### Settings for distributed training. Build a DistributedDataParallel model. ### model = nn.parallel.DistributedDataParallel(model) ### Settings for distributed training. Build a DistributedDataParallel model. ### optimizer = optim.Adam(model.parameters(), lr=lr) loss_func = torch.nn.CrossEntropyLoss() os.makedirs(args.output_dir, exist_ok=True) for epoch in range(1, max_epoch + 1): model.train() train_loss = 0 ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ### tr_sampler.set_epoch(epoch) ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ### for step, (tr_x, tr_y) in enumerate(tr_loader): if args.enable_gpu: tr_x, tr_y = tr_x.cuda(), tr_y.cuda() out = model(tr_x) loss = loss_func(out, tr_y) optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader))) val_loss = 0 pred_record = [] real_record = [] model.eval() with torch.no_grad(): for step, (val_x, val_y) in enumerate(val_loader): if args.enable_gpu: val_x, val_y = val_x.cuda(), val_y.cuda() out = model(val_x) pred_record += list(np.argmax(out.cpu().numpy(), axis=1)) real_record += list(val_y.cpu().numpy()) val_loss += loss_func(out, val_y).item() val_accu = accuracy_score(real_record, pred_record) print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n') if args.rank == 0: # save ckpt every epoch torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth')) if __name__ == '__main__': main()
- Result comparison
100-epoch cifar-10 dataset training is completed using two resource types respectively: single-node single-card and two-node 16-card. The training duration and test set accuracy are as follows.
Table 1 Training result comparison Resource Type
Single-Node Single-Card
Two-Node 16-Card
Duration
60 minutes
20 minutes
Accuracy
80+
80+
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.