Examples of Custom Scripts
To create a model in ModelArts by importing a model file from OBS, the model file package needs to comply with the ModelArts model package specifications. Additionally, the inference code and configuration file must also meet the requirements set by ModelArts.
This section provides custom script examples (including inference code examples) for common AI engines. For details about how to write model inference code, see Specifications for Writing a Model Inference Code File.
Tensorflow
There are two types of TensorFlow APIs, Keras and tf. They use different code for training and saving models, but the same code for inference.
Training a Model (Keras API)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
from keras.models import Sequential model = Sequential() from keras.layers import Dense import tensorflow as tf # Import a training dataset. mnist = tf.keras.datasets.mnist (x_train, y_train),(x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 print(x_train.shape) from keras.layers import Dense from keras.models import Sequential import keras from keras.layers import Dense, Activation, Flatten, Dropout # Define a model network. model = Sequential() model.add(Flatten(input_shape=(28,28))) model.add(Dense(units=5120,activation='relu')) model.add(Dropout(0.2)) model.add(Dense(units=10, activation='softmax')) # Define an optimizer and loss functions. model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) model.summary() # Train the model. model.fit(x_train, y_train, epochs=2) # Evaluate the model. model.evaluate(x_test, y_test) |
Saving a Model (Keras API)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
from keras import backend as K # K.get_session().run(tf.global_variables_initializer()) # Define the inputs and outputs of the prediction API. # The keys of the inputs and outputs dictionaries are used as the index keys for the input and output tensors of the model. # The input and output definitions of the model must match the custom inference script. predict_signature = tf.saved_model.signature_def_utils.predict_signature_def( inputs={"images" : model.input}, outputs={"scores" : model.output} ) # Define a save path. builder = tf.saved_model.builder.SavedModelBuilder('./mnist_keras/') builder.add_meta_graph_and_variables( sess = K.get_session(), # The tf.saved_model.tag_constants.SERVING tag needs to be defined for inference and deployment. tags=[tf.saved_model.tag_constants.SERVING], """ signature_def_map: Only one items can exist, or the corresponding key needs to be defined as follows: tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY """ signature_def_map={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: predict_signature } ) builder.save() |
Training a Model (tf API)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
from __future__ import print_function import gzip import os import urllib import numpy import tensorflow as tf from six.moves import urllib # Training data is obtained from the Yann LeCun official website http://yann.lecun.com/exdb/mnist/. SOURCE_URL = 'http://yann.lecun.com/exdb/mnist/' TRAIN_IMAGES = 'train-images-idx3-ubyte.gz' TRAIN_LABELS = 'train-labels-idx1-ubyte.gz' TEST_IMAGES = 't10k-images-idx3-ubyte.gz' TEST_LABELS = 't10k-labels-idx1-ubyte.gz' VALIDATION_SIZE = 5000 def maybe_download(filename, work_directory): """Download the data from Yann's website, unless it's already here.""" if not os.path.exists(work_directory): os.mkdir(work_directory) filepath = os.path.join(work_directory, filename) if not os.path.exists(filepath): filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath) statinfo = os.stat(filepath) print('Successfully downloaded %s %d bytes.' % (filename, statinfo.st_size)) return filepath def _read32(bytestream): dt = numpy.dtype(numpy.uint32).newbyteorder('>') return numpy.frombuffer(bytestream.read(4), dtype=dt)[0] def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting %s' % filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data def dense_to_one_hot(labels_dense, num_classes=10): """Convert class labels from scalars to one-hot vectors.""" num_labels = labels_dense.shape[0] index_offset = numpy.arange(num_labels) * num_classes labels_one_hot = numpy.zeros((num_labels, num_classes)) labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1 return labels_one_hot def extract_labels(filename, one_hot=False): """Extract the labels into a 1D uint8 numpy array [index].""" print('Extracting %s' % filename) with gzip.open(filename) as bytestream: magic = _read32(bytestream) if magic != 2049: raise ValueError( 'Invalid magic number %d in MNIST label file: %s' % (magic, filename)) num_items = _read32(bytestream) buf = bytestream.read(num_items) labels = numpy.frombuffer(buf, dtype=numpy.uint8) if one_hot: return dense_to_one_hot(labels) return labels class DataSet(object): """Class encompassing test, validation and training MNIST data set.""" def __init__(self, images, labels, fake_data=False, one_hot=False): """Construct a DataSet. one_hot arg is used only if fake_data is true.""" if fake_data: self._num_examples = 10000 self.one_hot = one_hot else: assert images.shape[0] == labels.shape[0], ( 'images.shape: %s labels.shape: %s' % (images.shape, labels.shape)) self._num_examples = images.shape[0] # Convert shape from [num examples, rows, columns, depth] # to [num examples, rows*columns] (assuming depth == 1) assert images.shape[3] == 1 images = images.reshape(images.shape[0], images.shape[1] * images.shape[2]) # Convert from [0, 255] -> [0.0, 1.0]. images = images.astype(numpy.float32) images = numpy.multiply(images, 1.0 / 255.0) self._images = images self._labels = labels self._epochs_completed = 0 self._index_in_epoch = 0 @property def images(self): return self._images @property def labels(self): return self._labels @property def num_examples(self): return self._num_examples @property def epochs_completed(self): return self._epochs_completed def next_batch(self, batch_size, fake_data=False): """Return the next `batch_size` examples from this data set.""" if fake_data: fake_image = [1] * 784 if self.one_hot: fake_label = [1] + [0] * 9 else: fake_label = 0 return [fake_image for _ in range(batch_size)], [ fake_label for _ in range(batch_size) ] start = self._index_in_epoch self._index_in_epoch += batch_size if self._index_in_epoch > self._num_examples: # Finished epoch self._epochs_completed += 1 # Shuffle the data perm = numpy.arange(self._num_examples) numpy.random.shuffle(perm) self._images = self._images[perm] self._labels = self._labels[perm] # Start next epoch start = 0 self._index_in_epoch = batch_size assert batch_size <= self._num_examples end = self._index_in_epoch return self._images[start:end], self._labels[start:end] def read_data_sets(train_dir, fake_data=False, one_hot=False): """Return training, validation and testing data sets.""" class DataSets(object): pass data_sets = DataSets() if fake_data: data_sets.train = DataSet([], [], fake_data=True, one_hot=one_hot) data_sets.validation = DataSet([], [], fake_data=True, one_hot=one_hot) data_sets.test = DataSet([], [], fake_data=True, one_hot=one_hot) return data_sets local_file = maybe_download(TRAIN_IMAGES, train_dir) train_images = extract_images(local_file) local_file = maybe_download(TRAIN_LABELS, train_dir) train_labels = extract_labels(local_file, one_hot=one_hot) local_file = maybe_download(TEST_IMAGES, train_dir) test_images = extract_images(local_file) local_file = maybe_download(TEST_LABELS, train_dir) test_labels = extract_labels(local_file, one_hot=one_hot) validation_images = train_images[:VALIDATION_SIZE] validation_labels = train_labels[:VALIDATION_SIZE] train_images = train_images[VALIDATION_SIZE:] train_labels = train_labels[VALIDATION_SIZE:] data_sets.train = DataSet(train_images, train_labels) data_sets.validation = DataSet(validation_images, validation_labels) data_sets.test = DataSet(test_images, test_labels) return data_sets training_iteration = 1000 modelarts_example_path = './modelarts-mnist-train-save-deploy-example' export_path = modelarts_example_path + '/model/' data_path = './' print('Training model...') mnist = read_data_sets(data_path, one_hot=True) sess = tf.InteractiveSession() serialized_tf_example = tf.placeholder(tf.string, name='tf_example') feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32), } tf_example = tf.parse_example(serialized_tf_example, feature_configs) x = tf.identity(tf_example['x'], name='x') # use tf.identity() to assign name y_ = tf.placeholder('float', shape=[None, 10]) w = tf.Variable(tf.zeros([784, 10])) b = tf.Variable(tf.zeros([10])) sess.run(tf.global_variables_initializer()) y = tf.nn.softmax(tf.matmul(x, w) + b, name='y') cross_entropy = -tf.reduce_sum(y_ * tf.log(y)) train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy) values, indices = tf.nn.top_k(y, 10) table = tf.contrib.lookup.index_to_string_table_from_tensor( tf.constant([str(i) for i in range(10)])) prediction_classes = table.lookup(tf.to_int64(indices)) for _ in range(training_iteration): batch = mnist.train.next_batch(50) train_step.run(feed_dict={x: batch[0], y_: batch[1]}) correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float')) print('training accuracy %g' % sess.run( accuracy, feed_dict={ x: mnist.test.images, y_: mnist.test.labels })) print('Done training!') |
Saving a Model (tf API)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# Export the model. # The model needs to be saved using the saved_model API. print('Exporting trained model to', export_path) builder = tf.saved_model.builder.SavedModelBuilder(export_path) tensor_info_x = tf.saved_model.utils.build_tensor_info(x) tensor_info_y = tf.saved_model.utils.build_tensor_info(y) # Define the inputs and outputs of the prediction API. # The keys of the inputs and outputs dictionaries are used as the index keys for the input and output tensors of the model. # The input and output definitions of the model must match the custom inference script. prediction_signature = ( tf.saved_model.signature_def_utils.build_signature_def( inputs={'images': tensor_info_x}, outputs={'scores': tensor_info_y}, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)) legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op') builder.add_meta_graph_and_variables( # Set tag to serve/tf.saved_model.tag_constants.SERVING. sess, [tf.saved_model.tag_constants.SERVING], signature_def_map={ 'predict_images': prediction_signature, }, legacy_init_op=legacy_init_op) builder.save() print('Done exporting!') |
Inference Code (Keras and tf APIs)
In the model inference code file customize_service.py, add a child model class which inherits properties from its parent model class. For details about the parent class and import statement of each model type, see Table 1. This example calls the parent class inference request method _inference(self, data). The method does not need to be overridden in the following code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
from PIL import Image import numpy as np from model_service.tfserving_model_service import TfServingBaseService class MnistService(TfServingBaseService): # Match the model input with the user's HTTPS API input during preprocessing. # The model input corresponding to the preceding training part is {"images":<array>}. def _preprocess(self, data): preprocessed_data = {} images = [] # Iterate the input data. for k, v in data.items(): for file_name, file_content in v.items(): image1 = Image.open(file_content) image1 = np.array(image1, dtype=np.float32) image1.resize((1,784)) images.append(image1) # Return the numpy array. images = np.array(images,dtype=np.float32) # Perform batch processing on multiple input samples and ensure that the shape is the same as that inputted during training. images.resize((len(data), 784)) preprocessed_data['images'] = images return preprocessed_data # The output corresponding to model saving in the preceding training part is {"scores":<array>}. # Postprocess the HTTPS output. def _postprocess(self, data): infer_output = {"mnist_result": []} # Iterate the model output. for output_name, results in data.items(): for result in results: infer_output["mnist_result"].append(result.index(max(result))) return infer_output |
from __future__ import absolute_import, division, print_function, unicode_literals import tensorflow as tf mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.2), # Name the output layer output, which is used to obtain the result during model inference. tf.keras.layers.Dense(10, activation='softmax', name="output") ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) model.fit(x_train, y_train, epochs=10) tf.keras.models.save_model(model, "./mnist")
Inference Code
In the model inference code file customize_service.py, add a child model class which inherits properties from its parent model class. For details about the parent class and import statement of each model type, see Table 1.
import logging import threading import numpy as np import tensorflow as tf from PIL import Image from model_service.tfserving_model_service import TfServingBaseService logger = logging.getLogger() logger.setLevel(logging.INFO) class MnistService(TfServingBaseService): def __init__(self, model_name, model_path): self.model_name = model_name self.model_path = model_path self.model = None self.predict = None # The label file can be loaded here and used in the post-processing function. # Directories for storing the label.txt file on OBS and in the model package # with open(os.path.join(self.model_path, 'label.txt')) as f: # self.label = json.load(f) # Load the model in saved_model format in non-blocking mode to prevent blocking timeout. thread = threading.Thread(target=self.load_model) thread.start() def load_model(self): # Load the model in saved_model format. self.model = tf.saved_model.load(self.model_path) signature_defs = self.model.signatures.keys() signature = [] # only one signature allowed for signature_def in signature_defs: signature.append(signature_def) if len(signature) == 1: model_signature = signature[0] else: logging.warning("signatures more than one, use serving_default signature from %s", signature) model_signature = tf.saved_model.DEFAULT_SERVING_SIGNATURE_DEF_KEY self.predict = self.model.signatures[model_signature] def _preprocess(self, data): images = [] for k, v in data.items(): for file_name, file_content in v.items(): image1 = Image.open(file_content) image1 = np.array(image1, dtype=np.float32) image1.resize((28, 28, 1)) images.append(image1) images = tf.convert_to_tensor(images, dtype=tf.dtypes.float32) preprocessed_data = images return preprocessed_data def _inference(self, data): return self.predict(data) def _postprocess(self, data): return { "result": int(data["output"].numpy()[0].argmax()) }
Pytorch
Training a Model
from __future__ import print_function import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms # Define a network structure. class Net(nn.Module): def __init__(self): super(Net, self).__init__() # The second dimension of the input must be 784. self.hidden1 = nn.Linear(784, 5120, bias=False) self.output = nn.Linear(5120, 10, bias=False) def forward(self, x): x = x.view(x.size()[0], -1) x = F.relu((self.hidden1(x))) x = F.dropout(x, 0.2) x = self.output(x) return F.log_softmax(x) def train(model, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.cross_entropy(output, target) loss.backward() optimizer.step() if batch_idx % 10 == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) def test( model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) device = torch.device("cpu") batch_size=64 kwargs={} train_loader = torch.utils.data.DataLoader( datasets.MNIST('.', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor() ])), batch_size=batch_size, shuffle=True, **kwargs) test_loader = torch.utils.data.DataLoader( datasets.MNIST('.', train=False, transform=transforms.Compose([ transforms.ToTensor() ])), batch_size=1000, shuffle=True, **kwargs) model = Net().to(device) optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) optimizer = optim.Adam(model.parameters()) for epoch in range(1, 2 + 1): train(model, device, train_loader, optimizer, epoch) test(model, device, test_loader)
Saving a Model
# The model must be saved using state_dict and can be deployed remotely. torch.save(model.state_dict(), "pytorch_mnist/mnist_mlp.pt")
Inference Code
In the model inference code file customize_service.py, add a child model class which inherits properties from its parent model class. For details about the parent class and import statement of each model type, see Table 1.
from PIL import Image import log from model_service.pytorch_model_service import PTServingBaseService import torch.nn.functional as F import torch.nn as nn import torch import json import numpy as np logger = log.getLogger(__name__) import torchvision.transforms as transforms # Define model preprocessing. infer_transformation = transforms.Compose([ transforms.Resize((28,28)), # Convert data to tensor. transforms.ToTensor() ]) import os class PTVisionService(PTServingBaseService): def __init__(self, model_name, model_path): # Call the constructor of the parent class. super(PTVisionService, self).__init__(model_name, model_path) # Call the custom function to load the model. self.model = Mnist(model_path) # Load labels. self.label = [0,1,2,3,4,5,6,7,8,9] # Labels can also be loaded by label file. # Reads the label.json file in the model directory. dir_path = os.path.dirname(os.path.realpath(self.model_path)) with open(os.path.join(dir_path, 'label.json')) as f: self.label = json.load(f) def _preprocess(self, data): preprocessed_data = {} for k, v in data.items(): input_batch = [] for file_name, file_content in v.items(): with Image.open(file_content) as image1: # Gray processing image1 = image1.convert("L") if torch.cuda.is_available(): input_batch.append(infer_transformation(image1).cuda()) else: input_batch.append(infer_transformation(image1)) input_batch_var = torch.autograd.Variable(torch.stack(input_batch, dim=0), volatile=True) print(input_batch_var.shape) preprocessed_data[k] = input_batch_var return preprocessed_data def _postprocess(self, data): results = [] for k, v in data.items(): result = torch.argmax(v[0]) result = {k: self.label[result]} results.append(result) return results def _inference(self, data): result = {} for k, v in data.items(): result[k] = self.model(v) return result class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.hidden1 = nn.Linear(784, 5120, bias=False) self.output = nn.Linear(5120, 10, bias=False) def forward(self, x): x = x.view(x.size()[0], -1) x = F.relu((self.hidden1(x))) x = F.dropout(x, 0.2) x = self.output(x) return F.log_softmax(x) def Mnist(model_path, **kwargs): # Generate a network. model = Net() # Load the model. if torch.cuda.is_available(): device = torch.device('cuda') model.load_state_dict(torch.load(model_path, map_location="cuda:0")) else: device = torch.device('cpu') model.load_state_dict(torch.load(model_path, map_location=device)) # CPU or GPU mapping model.to(device) # Set the model to evaluation mode. model.eval() return model
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot