Using Ray for Small Model Inference
Description
Small model inference indicates the process of performing inference on relatively small machine learning models. These models may be able to run efficiently on a single compute node due to lower complexity and fewer parameters. Even small models can struggle with large data volumes. To mitigate this, Ray can be employed for parallel and distributed inference to improve inference performance.
Ray is a fully managed service in DataArts Fabric that makes it easy to manage small model inference. You only need to define your small model inference process as an executable task of Ray and create and run an inference job in DataArts Fabric.
Prerequisites
- You have created an OBS bucket.
- You have created a DataArts Fabric workspace.
- You have created a DataArts Fabric Ray resource.
- You have created a DataArts Fabric Ray cluster.
Step 1: Prepare a Code Script
The following Python code script shows how to create an inference job, perform simple linear regression model inference, and use the distributed scheduling capability of Ray to calculate the inference result. The script is only for reference and you can create your own script for subsequent inference tasks as needed.
- simple_model.py is used to define and start a model. The script defines a linear regression model SimpleModel and a model deployment serve.
# simple_model.py from sklearn.linear_model import LinearRegression import numpy as np import pickle import ray from ray import serve from fastapi import FastAPI, Request from ray.serve.handle import DeploymentHandle app = FastAPI() class SimpleModel: def __init__(self): self.model = LinearRegression() X = np.array([[1, 1], [1, 2], [2, 2], [2, 3]]) y = np.dot(X, np.array([1, 2])) + 3 self.model.fit(X, y) def predict(self, X): return self.model.predict(X).tolist() model_instance = SimpleModel() @serve.deployment(name="simple_model_deployment", ray_actor_options={"num_cpus": 1}) @serve.ingress(app) class SimpleModelDeployment: def __init__(self, model: DeploymentHandle): self.model = model @app.post("/predict") async def predict(self, request: Request): request_data = await request.json() input_data = np.array(request_data).reshape(-1, 2) prediction = self.model.predict(input_data) return {"prediction": prediction} deployment_instance = SimpleModelDeployment.bind(model=model_instance) serve.run(deployment_instance)
- infer_client.py indicates the main entry script for invoking the client. The main process includes calling the model script to deploy the model, inputting the input data into the model for inference, and uploading the inference results to OBS. The -ak, -sk, -ep, and -dp parameters must be specified in the script.
- -ak: AK of OBS. To obtain the value, see Where Can I Obtain Access Keys (AK and SK)?
- -sk: SK of OBS. To obtain the value, see Where Can I Obtain Access Keys (AK and SK)?
- -ep: OBS endpoint. To obtain the value, see Endpoints and Domain Names.
- -dp: OBS path and file where inference output files are stored.
# client.py import requests import numpy as np from obs import ObsClient from urllib.parse import urlparse import argparse from dataclasses import dataclass import os from ray import serve import subprocess import multiprocessing import time import ray input_file_path = './input.txt' output_file_path = './output.txt' def run_model(): # serve run simple_model:deployment_instance subprocess.run(['python3', './simple_model.py']) @dataclass(frozen=True) class ParsedObsPath: bucket_id: str key_id: str def do_inference(): input_data = [] with open(input_file_path, 'r') as f: for line in f: parts = line.strip().split() input_data.append([float(parts[0]), float(parts[1])]) input_data_array = np.array(input_data).tolist() print(f"input_data_array={input_data_array}") response = requests.post("http://localhost:8000/predict", json=input_data_array) print(f"response: {response}") predictions = response.json()["prediction"] with open(output_file_path, 'w') as f: for prediction in predictions: f.write(f"{prediction}\n") print(f"result save in: {output_file_path}") def parse_obs_uri(path: str) -> ParsedObsPath: if not path.startswith('obs://'): raise Exception(f'OBS path format incorrect: "{path}"') parsed = urlparse(path) return ParsedObsPath(bucket_id=parsed.netloc, key_id=parsed.path[1:]) def upload_file_to_obs(obs_client: ObsClient, obs_path: str, source_path: str): if not os.path.exists(source_path): raise Exception( f'Source file is not exist: source_path={source_path}') uri = parse_obs_uri(obs_path) # ObsClient.putFile(bucketName, objectKey, file_path, metadata, headers, progressCallback) print(f"bucket_id={uri.bucket_id}, key_id={uri.key_id}, source_path={source_path}") result = obs_client.putFile( bucketName=uri.bucket_id, objectKey=uri.key_id, file_path=source_path ) return result def main(): parser = argparse.ArgumentParser() parser.add_argument("-ak", "--access_key_id", help="OBS access key", type=str, required=True) parser.add_argument("-sk", "--secret_access_key", help="OBS secret key", type=str, required=True) parser.add_argument("-st", "--security_token", help="OBS security token", type=str, required=False) parser.add_argument("-ep", "--endpoint", help="OBS entrypoint", type=str, required=True) parser.add_argument("-dp", "--dst_path", help="Local filesystem destination path", type=str, required=True) args = parser.parse_args() obs_client = ObsClient( access_key_id=args.access_key_id, secret_access_key=args.secret_access_key, security_token=args.security_token, server=args.endpoint, signature='obs' ) # run model print("start run model") background_process = multiprocessing.Process(target=run_model) background_process.start() # wait ray serve ready time.sleep(20) # do inference print("start do inference") do_inference() # upload infer result to obs print("start upload result to obs") upload_result = upload_file_to_obs( obs_client=obs_client, obs_path=args.dst_path, source_path=output_file_path ) print(f"upload result={upload_result}") if not upload_result.status < 300: raise Exception('Error while uploading to OBS.' f' upload status: {upload_result.status}') ray.shutdown() if __name__ == "__main__": main()
- input.txt indicates what need to be inferred.
3.0 5.0 1.0 2.0 4.0 6.0 2.0 3.0
Step 2: Upload the Code Script to an OBS Bucket
- Log in to the Huawei Cloud console, click
in the upper left corner, and choose Storage > Object Storage Service.
- Upload the code script created in Step 1 to the OBS bucket. For details, see Simple Upload (PUT).
- After the script is uploaded, check the uploaded script in the OBS bucket. You can select the script when creating a Ray job.
For example, upload the job script to the obs://fabric-job-test/rayJob/ray-job/RayInferDemo2 directory.
Step 3: Create an Inference Job
- Log in to DataArts Fabric Workspace Management Console, select the created workspace, and click Access Workspace.
- In the navigation pane, choose Development and Production > Jobs. Click Create Job in the upper right corner. For details, see Creating a Ray Job.
Table 1 Parameter description Parameter
Description
Code Directory
Select the path uploaded in Step 2, for example, obs://fabric-job-test/rayJob/ray-job/RayInferDemo2.
Ray Main File
Select the main entry script of the entire job, for example, infer_client.py.
Ray Job Parameters
Enter the parameters required for executing the main entry script. For details about how to obtain the AK and SK, see Where Can I Obtain Access Keys (AK and SK)? For details about how to obtain the endpoint, see Endpoints and Domain Names. Example:
-ak XXXXXXXXXXXXXXX -sk xxxxxxxxxxxxxxxx -ep obs.cn-north-7.huawei.com -dp obs://fabric-job-test/test_output/output.txt
Dependencies
Software and its version on which the Ray job depends. If there are multiple dependencies, enter them in different lines. Example:
scikit-learn==1.5.2 numpy==1.19.5
Step 4: Run a Job
- After the job is defined, ensure that an available Ray cluster is selected for the job. Locate the target job in the job list and click Start in the Operation column.
- Locate the target job in the job list and click View Details in the Operation column. Check the job status in the Run tab.
You can view the output results in the OBS bucket path.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot