更新时间:2025-12-18 GMT+08:00

UDAF

UDAF整体开发流程可以参考Scalar UDF,需要注意以下几点:

  • Handler指定的必须是Python Class,且Python Class中必须包括accumulate、aggregate_state、merge、finish 4个方法。具体的方法含义见表1 注册UDAF时的特殊实例方法
  • UDAF要求用户实现的accumulate、aggregate_state、merge方法是无副作用的(side-effect free),即这些方法的返回结果只取决于输入参数和聚合状态本身,而不应依赖于方法被调用的次数、顺序或外部环境。
  • UDAF的finish方法不论是单CN模式,还是多DN模式执行,对于每个分组(由GROUP BY从句指定)都只会在全局调用1次。
  • UDAF要求用户在定义非STRICT时,能够在accumulate方法中正确处理NULL值输入,即Python中的None值。
  • UDAF不支持作为窗口函数(window function)使用,即不支持在OVER从句中使用UDAF。
  • UDAF不支持开启并发和向量化计算。

示例

  1. 创建UDAF:
    CREATE AGGREGATE FUNCTION pysum(x INT)
    RETURNS INT
    STRICT
    LANGUAGE PYTHON
    RUNTIME_VERSION='3.11.2'
    HANDLER = 'CalculateSum'
    AS $$
    import typing
    class CalculateSum:
        def __init__(self) -> None:
            self.sum = 0
    
        def accumulate(self, x: int) -> None:
            self.sum += x
    
        def finish(self) -> int:
            return self.sum
    
        @property
        def aggregate_state(self) -> typing.Dict[str, typing.Any]:
            return {
                "sum": self.sum,
            }
    
        def merge(self, other_state: typing.Dict[str, typing.Any]) -> None:
            self.sum += other_state.get("sum")
    $$;
  2. 执行UDAF:
    SELECT pysum(x) FROM UNNEST(ARRAY[1, 2, NULL, 3]) AS x;
     pysum 
    -------
         6
    (1 row)