更新时间:2025-12-18 GMT+08:00
UDAF
UDAF整体开发流程可以参考Scalar UDF,需要注意以下几点:
- Handler指定的必须是Python Class,且Python Class中必须包括accumulate、aggregate_state、merge、finish 4个方法。具体的方法含义见表1 注册UDAF时的特殊实例方法。
- UDAF要求用户实现的accumulate、aggregate_state、merge方法是无副作用的(side-effect free),即这些方法的返回结果只取决于输入参数和聚合状态本身,而不应依赖于方法被调用的次数、顺序或外部环境。
- UDAF的finish方法不论是单CN模式,还是多DN模式执行,对于每个分组(由GROUP BY从句指定)都只会在全局调用1次。
- UDAF要求用户在定义非STRICT时,能够在accumulate方法中正确处理NULL值输入,即Python中的None值。
- UDAF不支持作为窗口函数(window function)使用,即不支持在OVER从句中使用UDAF。
- UDAF不支持开启并发和向量化计算。
示例
- 创建UDAF:
CREATE AGGREGATE FUNCTION pysum(x INT) RETURNS INT STRICT LANGUAGE PYTHON RUNTIME_VERSION='3.11.2' HANDLER = 'CalculateSum' AS $$ import typing class CalculateSum: def __init__(self) -> None: self.sum = 0 def accumulate(self, x: int) -> None: self.sum += x def finish(self) -> int: return self.sum @property def aggregate_state(self) -> typing.Dict[str, typing.Any]: return { "sum": self.sum, } def merge(self, other_state: typing.Dict[str, typing.Any]) -> None: self.sum += other_state.get("sum") $$; - 执行UDAF:
SELECT pysum(x) FROM UNNEST(ARRAY[1, 2, NULL, 3]) AS x; pysum ------- 6 (1 row)
父主题: UDF开发(Python)