Help Center/ DataArts Fabric/ Best Practices/ Using Ray for Small Model Inference
Updated on 2025-07-18 GMT+08:00

Using Ray for Small Model Inference

Description

Small model inference indicates the process of performing inference on relatively small machine learning models. These models may be able to run efficiently on a single compute node due to lower complexity and fewer parameters. Even small models can struggle with large data volumes. To mitigate this, Ray can be employed for parallel and distributed inference to improve inference performance.

Ray is a fully managed service in DataArts Fabric that makes it easy to manage small model inference. You only need to define your small model inference process as an executable task of Ray and create and run an inference job in DataArts Fabric.

Step 1: Prepare a Code Script

The following Python code script shows how to create an inference job, perform simple linear regression model inference, and use the distributed scheduling capability of Ray to calculate the inference result. The script is only for reference and you can create your own script for subsequent inference tasks as needed.

  • simple_model.py is used to define and start a model. The script defines a linear regression model SimpleModel and a model deployment serve.
    # simple_model.py
    from sklearn.linear_model import LinearRegression
    import numpy as np
    import pickle
    import ray
    from ray import serve
    from fastapi import FastAPI, Request
    from ray.serve.handle import DeploymentHandle
    
    app = FastAPI()
    
    class SimpleModel:
        def __init__(self):
            self.model = LinearRegression()
            X = np.array([[1, 1], [1, 2], [2, 2], [2, 3]])
            y = np.dot(X, np.array([1, 2])) + 3
            self.model.fit(X, y)
        def predict(self, X):
            return self.model.predict(X).tolist()
    
    model_instance = SimpleModel()
    
    @serve.deployment(name="simple_model_deployment", ray_actor_options={"num_cpus": 1})
    @serve.ingress(app)
    class SimpleModelDeployment:
        def __init__(self, model: DeploymentHandle):
            self.model = model
        @app.post("/predict")
        async def predict(self, request: Request):
            request_data = await request.json()
            input_data = np.array(request_data).reshape(-1, 2)
            prediction = self.model.predict(input_data)
            return {"prediction": prediction}
    deployment_instance = SimpleModelDeployment.bind(model=model_instance)
    serve.run(deployment_instance)
  • infer_client.py indicates the main entry script for invoking the client. The main process includes calling the model script to deploy the model, inputting the input data into the model for inference, and uploading the inference results to OBS. The -ak, -sk, -ep, and -dp parameters must be specified in the script.
    # client.py
    import requests
    import numpy as np
    from obs import ObsClient
    from urllib.parse import urlparse
    import argparse
    from dataclasses import dataclass
    import os
    from ray import serve
    import subprocess
    import multiprocessing
    import time
    import ray
    input_file_path = './input.txt'
    output_file_path = './output.txt'
    
    def run_model():
        # serve run simple_model:deployment_instance
        subprocess.run(['python3', './simple_model.py'])
    @dataclass(frozen=True)
    class ParsedObsPath:
        bucket_id: str
        key_id: str
    def do_inference():
        input_data = []
        with open(input_file_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                input_data.append([float(parts[0]), float(parts[1])])
        input_data_array = np.array(input_data).tolist()
        print(f"input_data_array={input_data_array}")
        response = requests.post("http://localhost:8000/predict", json=input_data_array)
        print(f"response: {response}")
        predictions = response.json()["prediction"]
        with open(output_file_path, 'w') as f:
            for prediction in predictions:
                f.write(f"{prediction}\n")
        print(f"result save in: {output_file_path}")
    def parse_obs_uri(path: str) -> ParsedObsPath:
        if not path.startswith('obs://'):
            raise Exception(f'OBS path format incorrect: "{path}"')
        parsed = urlparse(path)
        return ParsedObsPath(bucket_id=parsed.netloc, key_id=parsed.path[1:])
    def upload_file_to_obs(obs_client: ObsClient, obs_path: str, source_path: str):
        if not os.path.exists(source_path):
            raise Exception(
                f'Source file is not exist: source_path={source_path}')
        uri = parse_obs_uri(obs_path)
        # ObsClient.putFile(bucketName, objectKey, file_path, metadata, headers, progressCallback)
        print(f"bucket_id={uri.bucket_id}, key_id={uri.key_id}, source_path={source_path}")
        result = obs_client.putFile(
            bucketName=uri.bucket_id,
            objectKey=uri.key_id,
            file_path=source_path
        )
        return result
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument("-ak", "--access_key_id", help="OBS access key",
                            type=str, required=True)
        parser.add_argument("-sk", "--secret_access_key", help="OBS secret key",
                            type=str, required=True)
        parser.add_argument("-st", "--security_token", help="OBS security token",
                            type=str, required=False)
        parser.add_argument("-ep", "--endpoint", help="OBS entrypoint", type=str,
                            required=True)
        parser.add_argument("-dp", "--dst_path",
                            help="Local filesystem destination path", type=str,
                            required=True)
        args = parser.parse_args()
        obs_client = ObsClient(
            access_key_id=args.access_key_id,
            secret_access_key=args.secret_access_key,
            security_token=args.security_token,
            server=args.endpoint,
            signature='obs'
        )
        # run model
        print("start run model")
        background_process = multiprocessing.Process(target=run_model)
        background_process.start()
        # wait ray serve ready
        time.sleep(20)
        # do inference
        print("start do inference")
        do_inference()
        # upload infer result to obs
        print("start upload result to obs")
        upload_result = upload_file_to_obs(
            obs_client=obs_client,
            obs_path=args.dst_path,
            source_path=output_file_path
        )
        print(f"upload result={upload_result}")
        if not upload_result.status < 300:
            raise Exception('Error while uploading to OBS.'
                            f' upload status: {upload_result.status}')
        ray.shutdown()
    if __name__ == "__main__":
        main()
  • input.txt indicates what need to be inferred.
    3.0 5.0
    1.0 2.0
    4.0 6.0
    2.0 3.0

Step 2: Upload the Code Script to an OBS Bucket

  1. Log in to the Huawei Cloud console, click in the upper left corner, and choose Storage > Object Storage Service.
  2. Upload the code script created in Step 1 to the OBS bucket. For details, see Simple Upload (PUT).
  3. After the script is uploaded, check the uploaded script in the OBS bucket. You can select the script when creating a Ray job.

    For example, upload the job script to the obs://fabric-job-test/rayJob/ray-job/RayInferDemo2 directory.

Step 3: Create an Inference Job

  1. Log in to DataArts Fabric Workspace Management Console, select the created workspace, and click Access Workspace.
  2. In the navigation pane, choose Development and Production > Jobs. Click Create Job in the upper right corner. For details, see Creating a Ray Job.
    Table 1 Parameter description

    Parameter

    Description

    Code Directory

    Select the path uploaded in Step 2, for example, obs://fabric-job-test/rayJob/ray-job/RayInferDemo2.

    Ray Main File

    Select the main entry script of the entire job, for example, infer_client.py.

    Ray Job Parameters

    Enter the parameters required for executing the main entry script. For details about how to obtain the AK and SK, see Where Can I Obtain Access Keys (AK and SK)? For details about how to obtain the endpoint, see Endpoints and Domain Names. Example:

    -ak XXXXXXXXXXXXXXX -sk xxxxxxxxxxxxxxxx -ep obs.cn-north-7.huawei.com -dp obs://fabric-job-test/test_output/output.txt

    Dependencies

    Software and its version on which the Ray job depends. If there are multiple dependencies, enter them in different lines. Example:

    scikit-learn==1.5.2
    numpy==1.19.5

Step 4: Run a Job

  1. After the job is defined, ensure that an available Ray cluster is selected for the job. Locate the target job in the job list and click Start in the Operation column.
  2. Locate the target job in the job list and click View Details in the Operation column. Check the job status in the Run tab.

    You can view the output results in the OBS bucket path.