PySpark
Training and Saving a Model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from pyspark.ml import Pipeline, PipelineModel from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # Prepare training data using tuples. # Prepare training data from a list of (label, features) tuples. training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # Create a training instance. The logistic regression algorithm is used for training. # Create a LogisticRegression instance. This instance is an Estimator. lr = LogisticRegression(maxIter=10, regParam=0.01) # Train the logistic regression model. # Learn a LogisticRegression model. This uses the parameters stored in lr. model = lr.fit(training) # Save the model to a local directory. # Save model to local path. model.save("/tmp/spark_model") |
After the model is saved, it must be uploaded to the OBS directory before being published. The config.json configuration and the customize_service.py inference code must be included during the publishing. For details about how to compile config.json, see Specifications for Editing a Model Configuration File. For details about inference code, see Inference Code.
Inference Code
In the model inference code file customize_service.py, add a child model class. This child model class inherits properties from its parent model class. For details about the import statements of different types of parent model classes, see Table 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# coding:utf-8 import collections import json import traceback import model_service.log as log from model_service.spark_model_service import SparkServingBaseService from pyspark.ml.classification import LogisticRegression logger = log.getLogger(__name__) class UserService(SparkServingBaseService): # Pre-process data. def _preprocess(self, data): logger.info("Begin to handle data from user data...") # Read data. req_json = json.loads(data, object_pairs_hook=collections.OrderedDict) try: # Convert data to the spark dataframe format. predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"])) except Exception as e: logger.error("check your request data does meet the requirements ?") logger.error(traceback.format_exc()) raise Exception("check your request data does meet the requirements ?") return predict_spdf # Perform model inference. def _inference(self, data): try: # Load a model file. predict_model = LogisticRegression.load(self.model_path) # Perform data inference. prediction_result = predict_model.transform(data) except Exception as e: logger.error(traceback.format_exc()) raise Exception("Unable to load model and do dataframe transformation.") return prediction_result # Post-process data. def _postprocess(self, pre_data): logger.info("Get new data to respond...") predict_str = pre_data.toPandas().to_json(orient='records') predict_result = json.loads(predict_str) return predict_result |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot