PySpark
Entrenamiento y guardado de un modelo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from pyspark.ml import Pipeline, PipelineModel from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # Prepare training data using tuples. # Prepare training data from a list of (label, features) tuples. training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # Create a training instance. The logistic regression algorithm is used for training. # Create a LogisticRegression instance. This instance is an Estimator. lr = LogisticRegression(maxIter=10, regParam=0.01) # Train the logistic regression model. # Learn a LogisticRegression model. This uses the parameters stored in lr. model = lr.fit(training) # Save the model to a local directory. # Save model to local path. model.save("/tmp/spark_model") |
Después de guardar el modelo, debe subirse al directorio OBS antes de publicarse. La configuración config.json y el código de inferencia customize_service.py deben incluirse durante la publicación. Para obtener más información sobre cómo compilar config.json, consulte Especificaciones para editar un archivo de configuración de modelo. Para obtener más información sobre el código de inferencia, véase Código de inferencia.
Código de inferencia
En el archivo de código de inferencia de modelo customize_service.py, agregue una clase de modelo hijo. Esta clase de modelo hijo hereda las propiedades de su clase de modelo padre. Para obtener más información sobre las instrucciones de importación de diferentes tipos de clases de modelo padre, consulte Tabla 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# coding:utf-8 import collections import json import traceback import model_service.log as log from model_service.spark_model_service import SparkServingBaseService from pyspark.ml.classification import LogisticRegression logger = log.getLogger(__name__) class UserService(SparkServingBaseService): # Pre-process data. def _preprocess(self, data): logger.info("Begin to handle data from user data...") # Read data. req_json = json.loads(data, object_pairs_hook=collections.OrderedDict) try: # Convert data to the spark dataframe format. predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"])) except Exception as e: logger.error("check your request data does meet the requirements ?") logger.error(traceback.format_exc()) raise Exception("check your request data does meet the requirements ?") return predict_spdf # Perform model inference. def _inference(self, data): try: # Load a model file. predict_model = LogisticRegression.load(self.model_path) # Perform data inference. prediction_result = predict_model.transform(data) except Exception as e: logger.error(traceback.format_exc()) raise Exception("Unable to load model and do dataframe transformation.") return prediction_result # Post-process data. def _postprocess(self, pre_data): logger.info("Get new data to respond...") predict_str = pre_data.toPandas().to_json(orient='records') predict_result = json.loads(predict_str) return predict_result |