Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page
Help Center/ ModelArts/ Model Development (To Be Offline)/ Distributed Training/ Distributed Debugging Adaptation and Code Example

Distributed Debugging Adaptation and Code Example

Updated on 2023-11-21 GMT+08:00

In DistributedDataParallel, each process loads a subset of the original dataset in a batch, and finally the gradients of all processes are averaged as the final gradient. Due to a large number of samples, a calculated gradient is more reliable, and a learning rate can be increased.

This section describes the code of single-node training and distributed parallel training for the classification job of ResNet18 on the CIFAR-10 dataset. Directly execute the code to perform multi-node distributed training with CPUs or GPUs; comment out the distributed training settings in the code to perform single-node single-card training.

The training code contains three input parameters: basic training parameters, distributed parameters, and data parameters. The distributed parameters are automatically input by the platform. custom_data indicates whether to use custom data for training. If this parameter is set to true, torch-based random data is used for training and validation.

Dataset

CIFAR-10 dataset

In notebook instances, torchvision of the default version cannot be used to obtain datasets. Therefore, the sample code provides three training data loading methods.

Click CIFAR-10 python version on the download page to download the CIFAR-10 dataset.

  • Download the CIFAR-10 dataset using torchvision.
  • Download the CIFAR-10 dataset based on the URL and decompress the dataset in a specified directory. The sizes of the training set and test set are (50000, 3, 32, 32) and (10000, 3, 32, 32), respectively.
  • Use Torch to obtain a random dataset similar to CIFAR-10. The sizes of the training set and test set are (5000, 3, 32, 32) and (1000, 3, 32, 32), respectively. The labels are still of 10 types. Set custom_data to true, and the training task can be directly executed without loading data.

Training Code

In the following code, those commented with ### Settings for distributed training and ... ### are code modifications for multi-node distributed training.

Do not modify the sample code. After the data path is changed to your path, multi-node distributed training can be executed on ModelArts.

After the distributed code modifications are commented out, the single-node single-card training can be executed. For details about the complete code, see Sample Code of Distributed Training.

  • Importing dependency packages
    import datetime
    import inspect
    import os
    import pickle
    import random
    
    import argparse
    import numpy as np
    import torch
    import torch.distributed as dist
    from torch import nn, optim
    from torch.utils.data import TensorDataset, DataLoader
    from torch.utils.data.distributed import DistributedSampler
    from sklearn.metrics import accuracy_score
  • Defining the method and random number for loading data (The code for loading data is not described here due to its large amount.)
    def setup_seed(seed):
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        np.random.seed(seed)
        random.seed(seed)
        torch.backends.cudnn.deterministic = True
    
    def get_data(path):
    	pass
  • Defining a network structure
    class Block(nn.Module):
    
        def __init__(self, in_channels, out_channels, stride=1):
            super().__init__()
            self.residual_function = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
            self.shortcut = nn.Sequential()
            if stride != 1 or in_channels != out_channels:
                self.shortcut = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
    
        def forward(self, x):
            out = self.residual_function(x) + self.shortcut(x)
            return nn.ReLU(inplace=True)(out)
    
    
    class ResNet(nn.Module):
    
        def __init__(self, block, num_classes=10):
            super().__init__()
            self.conv1 = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace=True))
            self.conv2 = self.make_layer(block, 64, 64, 2, 1)
            self.conv3 = self.make_layer(block, 64, 128, 2, 2)
            self.conv4 = self.make_layer(block, 128, 256, 2, 2)
            self.conv5 = self.make_layer(block, 256, 512, 2, 2)
            self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
            self.dense_layer = nn.Linear(512, num_classes)
    
        def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
            strides = [stride] + [1] * (num_blocks - 1)
            layers = []
            for stride in strides:
                layers.append(block(in_channels, out_channels, stride))
                in_channels = out_channels
            return nn.Sequential(*layers)
    
        def forward(self, x):
            out = self.conv1(x)
            out = self.conv2(out)
            out = self.conv3(out)
            out = self.conv4(out)
            out = self.conv5(out)
            out = self.avg_pool(out)
            out = out.view(out.size(0), -1)
            out = self.dense_layer(out)
            return out
  • Training and validation
    def main():
        file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)
    
        seed = datetime.datetime.now().year
        setup_seed(seed)
    
        parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                         formatter_class=argparse.ArgumentDefaultsHelpFormatter)
        parser.add_argument('--enable_gpu', default='true')
        parser.add_argument('--lr', default='0.01', help='learning rate')
        parser.add_argument('--epochs', default='100', help='training iteration')
    
        parser.add_argument('--init_method', default=None, help='tcp_port')
        parser.add_argument('--rank', type=int, default=0, help='index of current task')
        parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')
    
        parser.add_argument('--custom_data', default='false')
        parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
        parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
        args, unknown = parser.parse_known_args()
    
        args.enable_gpu = args.enable_gpu == 'true'
        args.custom_data = args.custom_data == 'true'
        args.lr = float(args.lr)
        args.epochs = int(args.epochs)
    
        if args.custom_data:
            print('[warning] you are training on custom random dataset, '
                  'validation accuracy may range from 0.4 to 0.6.')
    
    ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###
        dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank)
    ### Settings for distributed training. Initialize DistributedDataParallel process. The init_method, rank, and world_size parameters are automatically input by the platform. ###
    
        tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)
    
        batch_per_gpu = 128
        gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
        batch = batch_per_gpu * gpus_per_node
    
        tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)
    
    ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###
        tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank)
        tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
    ### Settings for distributed training. Create a sampler for data distribution to ensure that different processes load different data. ###
    
        val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)
    
        lr = args.lr * gpus_per_node
        max_epoch = args.epochs
        model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
    
    ### Settings for distributed training. Build a DistributedDataParallel model. ###
        model = nn.parallel.DistributedDataParallel(model)
    ### Settings for distributed training. Build a DistributedDataParallel model. ###
    
        optimizer = optim.Adam(model.parameters(), lr=lr)
        loss_func = torch.nn.CrossEntropyLoss()
    
        os.makedirs(args.output_dir, exist_ok=True)
    
        for epoch in range(1, max_epoch + 1):
            model.train()
            train_loss = 0
    
    ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###
            tr_sampler.set_epoch(epoch)
    ### Settings for distributed training. DistributedDataParallel sampler. Random numbers are set for the DistributedDataParallel sampler based on the current epoch number to avoid loading duplicate data. ###
    
            for step, (tr_x, tr_y) in enumerate(tr_loader):
                if args.enable_gpu:
                    tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
                out = model(tr_x)
                loss = loss_func(out, tr_y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))
    
            val_loss = 0
            pred_record = []
            real_record = []
            model.eval()
            with torch.no_grad():
                for step, (val_x, val_y) in enumerate(val_loader):
                    if args.enable_gpu:
                        val_x, val_y = val_x.cuda(), val_y.cuda()
                    out = model(val_x)
                    pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                    real_record += list(val_y.cpu().numpy())
                    val_loss += loss_func(out, val_y).item()
            val_accu = accuracy_score(real_record, pred_record)
            print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')
    
            if args.rank == 0:
                # save ckpt every epoch
                torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))
    
    
    if __name__ == '__main__':
        main()
  • Result comparison
    100-epoch cifar-10 dataset training is completed using two resource types respectively: single-node single-card and two-node 16-card. The training duration and test set accuracy are as follows.
    Table 1 Training result comparison

    Resource Type

    Single-Node Single-Card

    Two-Node 16-Card

    Duration

    60 minutes

    20 minutes

    Accuracy

    80+

    80+

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback