更新时间:2024-10-30 GMT+08:00
Pyspark
训练并保存模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
from pyspark.ml import Pipeline, PipelineModel from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # 创建训练数据,此处通过tuples创建 # Prepare training data from a list of (label, features) tuples. training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # 创建训练实例,此处使用逻辑回归算法进行训练 # Create a LogisticRegression instance. This instance is an Estimator. lr = LogisticRegression(maxIter=10, regParam=0.01) # 训练逻辑回归模型 # Learn a LogisticRegression model. This uses the parameters stored in lr. model = lr.fit(training) # 保存模型到本地目录 # Save model to local path. model.save("/tmp/spark_model") |
保存完模型后,需要上传到OBS目录才能发布。发布时需要带上config.json配置和推理代码customize_service.py。config.json编写请参考模型配置文件编写说明,推理代码请参考推理代码。
推理代码
在模型代码推理文件customize_service.py中,需要添加一个子类,该子类继承对应模型类型的父类,各模型类型的父类名称和导入语句如请参考表1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# coding:utf-8 import collections import json import traceback import model_service.log as log from model_service.spark_model_service import SparkServingBaseService from pyspark.ml.classification import LogisticRegression logger = log.getLogger(__name__) class UserService(SparkServingBaseService): # 数据预处理 def _preprocess(self, data): logger.info("Begin to handle data from user data...") # 读取数据 req_json = json.loads(data, object_pairs_hook=collections.OrderedDict) try: # 将数据转换成spark dataframe格式 predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"])) except Exception as e: logger.error("check your request data does meet the requirements ?") logger.error(traceback.format_exc()) raise Exception("check your request data does meet the requirements ?") return predict_spdf # 模型推理 def _inference(self, data): try: # 加载模型文件 predict_model = LogisticRegression.load(self.model_path) # 对数据进行推理 prediction_result = predict_model.transform(data) except Exception as e: logger.error(traceback.format_exc()) raise Exception("Unable to load model and do dataframe transformation.") return prediction_result # 数据后处理 def _postprocess(self, pre_data): logger.info("Get new data to respond...") predict_str = pre_data.toPandas().to_json(orient='records') predict_result = json.loads(predict_str) return predict_result |
父主题: 自定义脚本代码示例