更新时间:2025-08-25 GMT+08:00

Scalar UDF

开发Python Scalar UDF,总体流程如下:

  1. 根据用户的业务编写和调试Python代码。

    开发UDF代码,除了业务本身代码以外,代码中还需要包含如下信息:

    • 导入模块:如果使用ibis-fabric sdk开发UDF,则需要安装和导入ibis相关模块。
    • 函数的主入口Handler:自定义代码中,必须清晰定义函数的主入口,Handler方法是唯一的。
    • 函数签名:Handler方法的定义中,需要显示指定输入参数和返回值的数据类型。
    • 其它:如果使用ibis-fabric sdk注册函数,则需要导入ibis-fabric模块的注册接口,并编写注册的代码;如果使用SQL语法注册函数,则需要编写序列化Handler的代码。
    以实现一个两数求和的Python UDF为例,代码文件结构如下:
    my_udf/
    ├── udf_main.py
    ├── register.py
    ├── serialize.py
    └── other/
            └── util.py
    代码示例如下:
    # udf_main.py包含了UDF的主函数,即Handler
    from other.util import printLog
    import numpy as np
    def pysum(arg1: int, arg2: int) -> int:
        printLog(arg1, arg2)
        return np.sum(np.array([arg1, arg2]))
    
    # util.py是函数的依赖文件
    def printLog(a,b):
        print(f"computing the sum of {a} and {b}")
    
    # register.py
    import huawei-ibis-fabric as fabric
    import ibis   # 导入ibis依赖
    con = ibis.fabric.connect(...)  
    from udf_main import pysum
    udf = con.udf.python.register(pysum, imports=["other/util.py"], packages=["numpy"])
    
    # serialize.py 序列化handler的代码如下
    import cloudpickle
    import pickle
    from udf_main import pysum
    
    if __name__ == "__main__":
        with open("pysum_source.py", "w") as file:
            my_bytes = cloudpickle.dumps(pysum, protocol=pickle.HIGHEST_PROTOCOL).hex()
            file.write(my_bytes)

  2. 注册Python UDF。注册Python UDF有以下两种方式:

    • 方式一:使用ibis-fabric sdk显示注册(推荐)。

      ibis-fabric显式注册是用户指定ibis的backend为DataArtsFabric,通过register/register_from_file接口来实现,调用即注册。以上述步骤1的例子为例,在与udf_main.py同级目录下新增register.py,详细流程及代码片段如下:

      1. 使用ibis-fabric创建会话。
        import huawei-ibis-fabric as fabric
        import ibis   # 导入ibis依赖
        con = ibis.fabric.connect(...)  # 创建会话
      2. (可选方案1)使用register接口注册,其中fn=pysum,是必填参数,表示UDF的主函数;其他参数均可选,请根据业务需要选填,例如该例中,imports表示UDF所依赖的文件。pacakges表示UDF所依赖的三方库。
        from udf_main import pysum
        udf = con.udf.python.register(pysum, imports=["other/util.py"], packages=["numpy"])
      3. (可选方案2)使用register_from_file接口注册,其中file_path="udf_main.py",表示主函数所在的文件路径;func_name="pysum",表示主函数名称。file_path和func_name这两个参数必填,其他参数请根据业务需要选填。
        udf = con.udf.python.register_from_file("udf_main.py", "pysum", imports=["other/util.py"], packages=["numpy"])

      您可以按需选择可选方案1或可选方案2,ibis-fabric会自动根据注册接口的入参将UDF所依赖的文件全部打包到一个压缩包中,并上传至OBS桶里。压缩包文件结构如下:

      pysum.zip/
      ├── pysum_source.py  # 该文件的内容,是使用cloudpickle将UDF主函数体序列化后并编码为十六进制。
      └── other/
              └── util.py  # 该文件是UDF所依赖的文件,打包时会保留其与主函数所在文件的相对目录结构。
    • 方式二:使用任意DataArtsFabric的客户端工具连接DataArtsFabric服务,下发UDF的DDL注册。
      该方式是直接使用DataArtsFabric SQL提供的UDF语法,通过编写DDL的形式注册,使用时请遵守具体的使用规范
      1. 参照步骤1中的示例,参考代码归档包的组织结构,将主函数序列化后,将pysum_source.py和依赖的相关文件一起打包,压缩包结构和上述方式一中的压缩包一致。
      2. 参考UDF语法格式,编写UDF的DDL。对于函数的入参和返回值类型参考数据类型映射,根据函数本身来定义。
        CREATE FUNCTION "your_schema".pysum(arg1 bigint, arg2 bigint)
        RETURNS bigint
        LANGUAGE PYTHON
        RUNTIME_VERSION = '3.11'
        HANDLER = 'pysum'
        imports = ('https://mini-kernel.obs.cn-north-7.ulanqab.huawei.com:5443/calculate_0/calculate_0.zip')
        packages=('numpy');

  3. 查看Python UDF。

    DataArtsFabric SQL提供show functions查看特定schema下的所有函数,且提供了describe function function_name方法查看特定函数的详细信息,具体的使用方式请参考SHOWDESCRIBE

  4. 调用Python UDF。

    使用DataArtsFabric任意客户端,下发查询即可,例如:

    select "your_schema".pysum(3,4);

  5. 查看Python UDF性能指标及调优。

    DataArtsFabric SQL支持了对Python UDF运行时的性能监控,以及提供了接口,可配置UDF运行时的资源规格和并发度,具体使用方式请参考Python UDF性能调优