Updated on 2025-08-25 GMT+08:00

Scalar UDF

The overall process of developing a Python scalar UDF is as follows:

  1. Write and debug Python code according to user service logic.

    When developing UDF code, except the core service logic, the code should also include the following elements:

    • Import modules: If using the ibis-fabric SDK to develop the UDF, you need to install and import related Ibis modules.
    • Main function entry handler: Clearly define the main entry point of the function in your custom code. The handler method must be unique.
    • Function signature: Within the definition of the handler method, explicitly declare the data types of both input parameters and return values.
    • Others: When registering functions via the ibis-fabric SDK, import the registration interface from the ibis-fabric module and write corresponding registration code. Alternatively, if using SQL syntax for registration, prepare code to serialize the handler.
    Taking a Python UDF that sums two numbers as an example, here is its file structure:
    my_udf/
    ├── udf_main.py
    ├── register.py
    ├── serialize.py
    └── other/
            └── util.py
    Sample code:
    # udf_main.py houses the primary function of the UDF, that is, the handler
    from other.util import printLog
    import numpy as np
    def pysum(arg1: int, arg2: int) -> int:
        printLog(arg1, arg2)
        return np.sum(np.array([arg1, arg2]))
    
    # util.py serves as the function's dependency file
    def printLog(a,b):
        print(f"computing the sum of {a} and {b}")
    
    # register.py
    import huawei-ibis-fabric as fabric
    import ibis   # Import Ibis dependencies.
    con = ibis.fabric.connect(...)  
    from udf_main import pysum
    udf = con.udf.python.register(pysum, imports=["other/util.py"], packages=["numpy"])
    
    # Below is the code in serialize.py responsible for serializing the handler.
    import cloudpickle
    import pickle
    from udf_main import pysum
    
    if __name__ == "__main__":
        with open("pysum_source.py", "w") as file:
            my_bytes = cloudpickle.dumps(pysum, protocol=pickle.HIGHEST_PROTOCOL).hex()
            file.write(my_bytes)

  2. Register a Python UDF using either of the following methods:

    • Method 1: Use the ibis-fabric SDK for explicit registration (recommended).

      Explicit registration using ibis-fabric involves specifying the backend of Ibis as DataArts Fabric through the register or register_from_file interface, which registers upon invocation. Taking the example from step 1, add a new register.py in the same directory as udf_main.py. The detailed process and code snippets are as follows:

      1. Create a session using ibis-fabric.
        import huawei-ibis-fabric as fabric
        import ibis   # Import Ibis dependencies.
        con = ibis.fabric.connect(...) # Create a session.
      2. (Optional solution 1) Use the register interface for registration. fn=pysum is mandatory and indicates the main function of the UDF. Other parameters are optional and need to be set based on service requirements. For example, imports indicates the files that the UDF relies on, while packages refers to the third-party libraries that the UDF depends on.
        from udf_main import pysum
        udf = con.udf.python.register(pysum, imports=["other/util.py"], packages=["numpy"])
      3. (Optional solution 2) Use the register_from_file interface for registration. file_path="udf_main.py" specifies the file path of the main function, and func_name="pysum" specifies the name of the main function. Both file_path and func_name parameters are required. Other parameters are set as needed.
        udf = con.udf.python.register_from_file("udf_main.py", "pysum", imports=["other/util.py"], packages=["numpy"])

      You can select optional solution 1 or 2 as needed. ibis-fabric will automatically pack all files that the UDF depends on into a compressed package based on the input parameters of the registration interface and upload it to the OBS bucket. The file structure of the compressed package is as follows:

      pysum.zip/
      ├── pysum_source.py  # The content of this file is the hexadecimal code obtained after the UDF main function body is serialized using cloudpickle.
      └── other/
              └── util.py  # This file is the file on which the UDF depends. During packaging, the directory structure relative to the file where the main function is located is retained.
    • Method 2: Use any DataArts Fabric client tool to connect to the DataArts Fabric service and submit the DDL registration for the UDF.
      In this method, the UDF syntax provided by DataArts Fabric SQL is directly used and registered through writing DDL. Ensure compliance with specific usage guidelines during application.
      1. Refer to the example in step 1 and Organization Structure of the Code Archive Package, serialize the main function, then package pysum_source.py along with its dependencies. Ensure the structure of the compressed package matches that described in method 1.
      2. Refer to the UDF syntax to compile the DDL of the UDF. For the input parameters and return value types of the function, refer to Data Type Mapping, defining them according to the function itself.
        CREATE FUNCTION "your_schema".pysum(arg1 bigint, arg2 bigint)
        RETURNS bigint
        LANGUAGE PYTHON
        RUNTIME_VERSION = '3.11'
        HANDLER = 'pysum'
        imports = ('https://mini-kernel.obs.cn-north-7.ulanqab.huawei.com:5443/calculate_0/calculate_0.zip')
        packages=('numpy');

  3. View the Python UDF.

    DataArts Fabric SQL provides show functions to view all functions in a specific schema and offers the describe function function_name method to view detailed information about a specific function. Refer to SHOW and DESCRIBE for specific usage instructions.

  4. Invoke the Python UDF.

    Use any DataArts Fabric client to submit a query, for example:

    select "your_schema".pysum(3,4);

  5. View and optimize Python UDF performance metrics.

    DataArts Fabric SQL supports performance monitoring during Python UDF execution and offers an API for configuring resource specifications and concurrency levels during UDF operation. For detailed usage instructions, refer to Python UDF Performance Tuning.