更新时间:2025-05-27 GMT+08:00

在PySpark程序中创建自定义UDF、UDAF和UDTF

本章节指导用户通过PySpark程序中创建自定义UDF、UDAF和UDTF。当前版本Spark仅支持Python 3.7、3.8、3.9和3.10版本,使用其他Python版本可能会导致兼容性问题。

约束与限制

自定义UDF使用指导

可通过pyspark.sql.functions.udf函数创建自定义udf,pyspark.sql.functions.udf介绍如下:
pyspark.sql.functions.udf(f: Union[Callable[[…], Any], DataTypeOrString, None] = None, returnType: DataTypeOrString = StringType()) → Union[UserDefinedFunctionLike, Callable[[Callable[[…], Any]], UserDefinedFunctionLike]]
表1 参数说明

参数

说明

f

python函数。

returnType

用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。

  • 用户定义函数默认被认为是确定性的。由于优化,可以消除重复调用,甚至可能调用该函数比查询中出现的次数更多。如果你的函数不是确定性的,可以调用用户定义函数的asNondeterministic。例如:
    from pyspark.sql.types import IntegerType import random 
    random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
  • 用户定义函数不支持条件表达式或布尔表达式中的短路,最终只能在内部执行。如果函数在特殊行上失败,则解决方法是将条件合并到函数中。
  • 用户定义函数在调用方不接受关键字参数。
示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
sparkSession = SparkSession.builder.getOrCreate()
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
    if s is not None:
        return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = sparkSession.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
sparkSession.stop()
运行结果:
+----------+--------------+------------+                                        
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

自定义UDAF使用指导

当前版本不支持直接在PySpark中创建UDAF,可以通过pyspark.sql.functions.pandas_udf间接实现,pyspark.sql.functions.pandas_udf介绍如下:

pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)

表2 参数说明

参数

说明

f

用户定义函数。如果用作独立函数,则为Python函数。

returnType

用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。

functionType

pyspark.sql.functions.PandasUDFType中的枚举值。默认值:SCALAR。此参数用于兼容性。建议使用Python类型提示。

示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType
import pandas as pd

sparkSession = SparkSession.builder.appName("PandasUDAFExample").getOrCreate()

@pandas_udf(DoubleType())
def pandas_mean_udaf(v: pd.Series) -> float:
    return v.mean()

data = [
    (1, 2.0),
    (1, 4.0),
    (2, 5.0),
    (2, 6.0)
]
df = sparkSession.createDataFrame(data, ["id", "value"])

result_df = df.groupBy("id").agg(pandas_mean_udaf(df["value"]).alias("mean_value"))
result_df.show()
sparkSession.stop()
运行结果:
+---+----------+                                                                
| id|mean_value|
+---+----------+
|  1|       3.0|
|  2|       5.5|
+---+----------+

自定义UDTF使用指导

在Spark 3.3.1中,PySpark目前没有直接提供类似Scala那样的UDTF(用户自定义表生成函数)的API。如果你需要实现UDTF效果(即一行数据转换成多行数据),通常可以使用如下替代方案:
  • 利用UDF返回列表,再使用DataFrame的explode函数将数组拆分成多行。
  • 使用RDD转换中的flatMap来实现一行生成多行数据的效果。

以一行数据转换成多行数据场景为例:

  1. 利用UDF返回Array,再使用explode拆分。

    用Python函数编写一个UDF,该函数将输入数据(或一行中的某个字段)转化为列表(或数组)。然后使用DataFrame内置的explode函数将数组拆分成多行,从而达到UDTF的效果。

    示例:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf, explode
    from pyspark.sql.types import ArrayType, StringType
    
    sparkSession = SparkSession.builder.appName("SimulateUDTF").getOrCreate()
    
    data = [("a,b,c",), ("d,e",)]
    df = spark.createDataFrame(data, ["csv_col"])
    
    def split_string(s):
        if s:
            return s.split(",")
        return []
    
    split_udf = udf(split_string, ArrayType(StringType()))
    
    df_with_array = df.withColumn("split_col", split_udf("csv_col"))
    df_exploded = df_with_array.withColumn("word", explode("split_col"))
    
    df_exploded.show()
    sparkSession.stop()
    运行结果:
    +-------+---------+----+
    |csv_col|split_col|word|
    +-------+---------+----+
    |  a,b,c|[a, b, c]|   a|
    |  a,b,c|[a, b, c]|   b|
    |  a,b,c|[a, b, c]|   c|
    |    d,e|   [d, e]|   d|
    |    d,e|   [d, e]|   e|
    +-------+---------+----+
  2. 利用RDD的flatMap实现。

    对于更灵活的情况,您可以直接使用DataFrame的RDD接口,通过flatMap操作来实现转换。

    样例(以将输入行拆分为多行为例):

    from pyspark.sql import SparkSession
    
    sparkSession = SparkSession.builder.appName("SimulateUDTFWithFlatMap").getOrCreate()
    
    data = [("a,b,c",), ("d,e",)]
    df = spark.createDataFrame(data, ["csv_col"])
    
    rdd = df.rdd.flatMap(lambda row: [(row.csv_col, token) for token in row.csv_col.split(",")])
    new_df = rdd.toDF(["csv_col", "word"])
    new_df.show()
    
    sparkSession.stop()

    运行结果:

    +-------+----+
    |csv_col|word|
    +-------+----+
    |  a,b,c|   a|
    |  a,b,c|   b|
    |  a,b,c|   c|
    |    d,e|   d|
    |    d,e|   e|
    +-------+----+