在PySpark程序中创建自定义UDF、UDAF和UDTF
本章节指导用户通过PySpark程序中创建自定义UDF、UDAF和UDTF。当前版本Spark仅支持Python 3.7、3.8、3.9和3.10版本,使用其他Python版本可能会导致兼容性问题。
约束与限制
- 已参考在PySpark程序中使用Conda打包Python环境及第三方库章节准备Python环境。
- 在PySpark中,通过Python创建的UDF、UDAF、UDTF通常只能注册为临时UDF、UDAF、UDTF,只在当前SparkSession中有效。
自定义UDF使用指导
pyspark.sql.functions.udf(f: Union[Callable[[…], Any], DataTypeOrString, None] = None, returnType: DataTypeOrString = StringType()) → Union[UserDefinedFunctionLike, Callable[[Callable[[…], Any]], UserDefinedFunctionLike]]
参数 |
说明 |
---|---|
f |
python函数。 |
returnType |
用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。 |
- 用户定义函数默认被认为是确定性的。由于优化,可以消除重复调用,甚至可能调用该函数比查询中出现的次数更多。如果你的函数不是确定性的,可以调用用户定义函数的asNondeterministic。例如:
from pyspark.sql.types import IntegerType import random random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
- 用户定义函数不支持条件表达式或布尔表达式中的短路,最终只能在内部执行。如果函数在特殊行上失败,则解决方法是将条件合并到函数中。
- 用户定义函数在调用方不接受关键字参数。
from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType sparkSession = SparkSession.builder.getOrCreate() slen = udf(lambda s: len(s), IntegerType()) @udf def to_upper(s): if s is not None: return s.upper() @udf(returnType=IntegerType()) def add_one(x): if x is not None: return x + 1 df = sparkSession.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() sparkSession.stop()
+----------+--------------+------------+ |slen(name)|to_upper(name)|add_one(age)| +----------+--------------+------------+ | 8| JOHN DOE| 22| +----------+--------------+------------+
自定义UDAF使用指导
当前版本不支持直接在PySpark中创建UDAF,可以通过pyspark.sql.functions.pandas_udf间接实现,pyspark.sql.functions.pandas_udf介绍如下:
pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)
参数 |
说明 |
---|---|
f |
用户定义函数。如果用作独立函数,则为Python函数。 |
returnType |
用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。 |
functionType |
pyspark.sql.functions.PandasUDFType中的枚举值。默认值:SCALAR。此参数用于兼容性。建议使用Python类型提示。 |
from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import DoubleType import pandas as pd sparkSession = SparkSession.builder.appName("PandasUDAFExample").getOrCreate() @pandas_udf(DoubleType()) def pandas_mean_udaf(v: pd.Series) -> float: return v.mean() data = [ (1, 2.0), (1, 4.0), (2, 5.0), (2, 6.0) ] df = sparkSession.createDataFrame(data, ["id", "value"]) result_df = df.groupBy("id").agg(pandas_mean_udaf(df["value"]).alias("mean_value")) result_df.show() sparkSession.stop()
+---+----------+ | id|mean_value| +---+----------+ | 1| 3.0| | 2| 5.5| +---+----------+
自定义UDTF使用指导
- 利用UDF返回列表,再使用DataFrame的explode函数将数组拆分成多行。
- 使用RDD转换中的flatMap来实现一行生成多行数据的效果。
以一行数据转换成多行数据场景为例:
- 利用UDF返回Array,再使用explode拆分。
用Python函数编写一个UDF,该函数将输入数据(或一行中的某个字段)转化为列表(或数组)。然后使用DataFrame内置的explode函数将数组拆分成多行,从而达到UDTF的效果。
示例:from pyspark.sql import SparkSession from pyspark.sql.functions import udf, explode from pyspark.sql.types import ArrayType, StringType sparkSession = SparkSession.builder.appName("SimulateUDTF").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) def split_string(s): if s: return s.split(",") return [] split_udf = udf(split_string, ArrayType(StringType())) df_with_array = df.withColumn("split_col", split_udf("csv_col")) df_exploded = df_with_array.withColumn("word", explode("split_col")) df_exploded.show() sparkSession.stop()
运行结果:+-------+---------+----+ |csv_col|split_col|word| +-------+---------+----+ | a,b,c|[a, b, c]| a| | a,b,c|[a, b, c]| b| | a,b,c|[a, b, c]| c| | d,e| [d, e]| d| | d,e| [d, e]| e| +-------+---------+----+
- 利用RDD的flatMap实现。
对于更灵活的情况,您可以直接使用DataFrame的RDD接口,通过flatMap操作来实现转换。
样例(以将输入行拆分为多行为例):
from pyspark.sql import SparkSession sparkSession = SparkSession.builder.appName("SimulateUDTFWithFlatMap").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) rdd = df.rdd.flatMap(lambda row: [(row.csv_col, token) for token in row.csv_col.split(",")]) new_df = rdd.toDF(["csv_col", "word"]) new_df.show() sparkSession.stop()
运行结果:
+-------+----+ |csv_col|word| +-------+----+ | a,b,c| a| | a,b,c| b| | a,b,c| c| | d,e| d| | d,e| e| +-------+----+