更新时间:2024-10-30 GMT+08:00
PyTorch
训练模型
训练模型自定义脚本代码示例:
from __future__ import print_function import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms # 定义网络结构 class Net(nn.Module): def __init__(self): super(Net, self).__init__() # 输入第二维需要为784 self.hidden1 = nn.Linear(784, 5120, bias=False) self.output = nn.Linear(5120, 10, bias=False) def forward(self, x): x = x.view(x.size()[0], -1) x = F.relu((self.hidden1(x))) x = F.dropout(x, 0.2) x = self.output(x) return F.log_softmax(x) def train(model, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.cross_entropy(output, target) loss.backward() optimizer.step() if batch_idx % 10 == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) def test( model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) device = torch.device("cpu") batch_size=64 kwargs={} train_loader = torch.utils.data.DataLoader( datasets.MNIST('.', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor() ])), batch_size=batch_size, shuffle=True, **kwargs) test_loader = torch.utils.data.DataLoader( datasets.MNIST('.', train=False, transform=transforms.Compose([ transforms.ToTensor() ])), batch_size=1000, shuffle=True, **kwargs) model = Net().to(device) optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) optimizer = optim.Adam(model.parameters()) for epoch in range(1, 2 + 1): train(model, device, train_loader, optimizer, epoch) test(model, device, test_loader)
保存模型:
# 必须采用state_dict的保存方式,支持异地部署 torch.save(model.state_dict(), "pytorch_mnist/mnist_mlp.pt")
推理代码:
在模型代码推理文件customize_service.py中,需要添加一个子类,该子类继承对应模型类型的父类,各模型类型的父类名称和导入语句如请参考表1。
from PIL import Image import log from model_service.pytorch_model_service import PTServingBaseService import torch.nn.functional as F import torch.nn as nn import torch import json import numpy as np logger = log.getLogger(__name__) import torchvision.transforms as transforms # 定义模型预处理 infer_transformation = transforms.Compose([ transforms.Resize((28,28)), # 需要处理成pytorch tensor transforms.ToTensor() ]) import os class PTVisionService(PTServingBaseService): def __init__(self, model_name, model_path): # 调用父类构造方法 super(PTVisionService, self).__init__(model_name, model_path) # 调用自定义函数加载模型 self.model = Mnist(model_path) # 加载标签 self.label = [0,1,2,3,4,5,6,7,8,9] # 亦可通过文件标签文件加载 # model目录下放置label.json文件,此处读取 dir_path = os.path.dirname(os.path.realpath(self.model_path)) with open(os.path.join(dir_path, 'label.json')) as f: self.label = json.load(f) def _preprocess(self, data): preprocessed_data = {} for k, v in data.items(): input_batch = [] for file_name, file_content in v.items(): with Image.open(file_content) as image1: # 灰度处理 image1 = image1.convert("L") if torch.cuda.is_available(): input_batch.append(infer_transformation(image1).cuda()) else: input_batch.append(infer_transformation(image1)) input_batch_var = torch.autograd.Variable(torch.stack(input_batch, dim=0), volatile=True) print(input_batch_var.shape) preprocessed_data[k] = input_batch_var return preprocessed_data def _postprocess(self, data): results = [] for k, v in data.items(): result = torch.argmax(v[0]) result = {k: self.label[result]} results.append(result) return results def _inference(self, data): result = {} for k, v in data.items(): result[k] = self.model(v) return result class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.hidden1 = nn.Linear(784, 5120, bias=False) self.output = nn.Linear(5120, 10, bias=False) def forward(self, x): x = x.view(x.size()[0], -1) x = F.relu((self.hidden1(x))) x = F.dropout(x, 0.2) x = self.output(x) return F.log_softmax(x) def Mnist(model_path, **kwargs): # 生成网络 model = Net() # 加载模型 if torch.cuda.is_available(): device = torch.device('cuda') model.load_state_dict(torch.load(model_path, map_location="cuda:0")) else: device = torch.device('cpu') model.load_state_dict(torch.load(model_path, map_location=device)) # CPU或者GPU映射 model.to(device) # 声明为推理模式 model.eval() return model
父主题: 自定义脚本代码示例