PySpark
Training and Saving a Model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from pyspark.ml import Pipeline, PipelineModel from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # Prepare training data using tuples. # Prepare training data from a list of (label, features) tuples. training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # Create a training instance. The logistic regression algorithm is used for training. # Create a LogisticRegression instance. This instance is an Estimator. lr = LogisticRegression(maxIter=10, regParam=0.01) # Train the logistic regression model. # Learn a LogisticRegression model. This uses the parameters stored in lr. model = lr.fit(training) # Save the model to a local directory. # Save model to local path. model.save("/tmp/spark_model") |
After the model is saved, it must be uploaded to the OBS directory before being published. The config.json configuration and the customize_service.py inference code must be included during the publishing. For details about how to compile config.json, see Specifications for Editing a Model Configuration File. For details about inference code, see Inference Code.
Inference Code
In the model inference code file customize_service.py, add a child model class. This child model class inherits properties from its parent model class. For details about the import statements of different types of parent model classes, see Table 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# coding:utf-8 import collections import json import traceback import model_service.log as log from model_service.spark_model_service import SparkServingBaseService from pyspark.ml.classification import LogisticRegression logger = log.getLogger(__name__) class UserService(SparkServingBaseService): # Pre-process data. def _preprocess(self, data): logger.info("Begin to handle data from user data...") # Read data. req_json = json.loads(data, object_pairs_hook=collections.OrderedDict) try: # Convert data to the spark dataframe format. predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"])) except Exception as e: logger.error("check your request data does meet the requirements ?") logger.error(traceback.format_exc()) raise Exception("check your request data does meet the requirements ?") return predict_spdf # Perform model inference. def _inference(self, data): try: # Load a model file. predict_model = LogisticRegression.load(self.model_path) # Perform data inference. prediction_result = predict_model.transform(data) except Exception as e: logger.error(traceback.format_exc()) raise Exception("Unable to load model and do dataframe transformation.") return prediction_result # Post-process data. def _postprocess(self, pre_data): logger.info("Get new data to respond...") predict_str = pre_data.toPandas().to_json(orient='records') predict_result = json.loads(predict_str) return predict_result |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.