在PySpark程序中创建自定义UDF、UDAF和UDTF
本章节指导用户通过PySpark程序中创建自定义UDF、UDAF和UDTF。当前版本Spark仅支持Python 3.7、3.8、3.9和3.10版本,使用其他Python版本可能会导致兼容性问题。
约束与限制
- 已参考在PySpark程序中使用Conda打包Python环境及第三方库章节准备Python环境。
- 在PySpark中,通过Python创建的UDF、UDAF、UDTF通常只能注册为临时UDF、UDAF、UDTF,只在当前SparkSession中有效。
自定义UDF使用指导
pyspark.sql.functions.udf(f: Union[Callable[[…], Any], DataTypeOrString, None] = None, returnType: DataTypeOrString = StringType()) → Union[UserDefinedFunctionLike, Callable[[Callable[[…], Any]], UserDefinedFunctionLike]]
|
参数 |
说明 |
|---|---|
|
f |
python函数。 |
|
returnType |
用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。 |
- 用户定义函数默认被认为是确定性的。由于优化,可以消除重复调用,甚至可能调用该函数比查询中出现的次数更多。如果你的函数不是确定性的,可以调用用户定义函数的asNondeterministic。例如:
from pyspark.sql.types import IntegerType import random random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
- 用户定义函数不支持条件表达式或布尔表达式中的短路,最终只能在内部执行。如果函数在特殊行上失败,则解决方法是将条件合并到函数中。
- 用户定义函数在调用方不接受关键字参数。
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
sparkSession = SparkSession.builder.getOrCreate()
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
if s is not None:
return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
if x is not None:
return x + 1
df = sparkSession.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
sparkSession.stop()
+----------+--------------+------------+ |slen(name)|to_upper(name)|add_one(age)| +----------+--------------+------------+ | 8| JOHN DOE| 22| +----------+--------------+------------+
自定义UDAF使用指导
当前版本不支持直接在PySpark中创建UDAF,可以通过pyspark.sql.functions.pandas_udf间接实现,pyspark.sql.functions.pandas_udf介绍如下:
pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)
|
参数 |
说明 |
|---|---|
|
f |
用户定义函数。如果用作独立函数,则为Python函数。 |
|
returnType |
用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。 |
|
functionType |
pyspark.sql.functions.PandasUDFType中的枚举值。默认值:SCALAR。此参数用于兼容性。建议使用Python类型提示。 |
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType
import pandas as pd
sparkSession = SparkSession.builder.appName("PandasUDAFExample").getOrCreate()
@pandas_udf(DoubleType())
def pandas_mean_udaf(v: pd.Series) -> float:
return v.mean()
data = [
(1, 2.0),
(1, 4.0),
(2, 5.0),
(2, 6.0)
]
df = sparkSession.createDataFrame(data, ["id", "value"])
result_df = df.groupBy("id").agg(pandas_mean_udaf(df["value"]).alias("mean_value"))
result_df.show()
sparkSession.stop()
+---+----------+ | id|mean_value| +---+----------+ | 1| 3.0| | 2| 5.5| +---+----------+
自定义UDTF使用指导
- 利用UDF返回列表,再使用DataFrame的explode函数将数组拆分成多行。
- 使用RDD转换中的flatMap来实现一行生成多行数据的效果。
以一行数据转换成多行数据场景为例:
- 利用UDF返回Array,再使用explode拆分。
用Python函数编写一个UDF,该函数将输入数据(或一行中的某个字段)转化为列表(或数组)。然后使用DataFrame内置的explode函数将数组拆分成多行,从而达到UDTF的效果。
示例:from pyspark.sql import SparkSession from pyspark.sql.functions import udf, explode from pyspark.sql.types import ArrayType, StringType sparkSession = SparkSession.builder.appName("SimulateUDTF").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) def split_string(s): if s: return s.split(",") return [] split_udf = udf(split_string, ArrayType(StringType())) df_with_array = df.withColumn("split_col", split_udf("csv_col")) df_exploded = df_with_array.withColumn("word", explode("split_col")) df_exploded.show() sparkSession.stop()运行结果:+-------+---------+----+ |csv_col|split_col|word| +-------+---------+----+ | a,b,c|[a, b, c]| a| | a,b,c|[a, b, c]| b| | a,b,c|[a, b, c]| c| | d,e| [d, e]| d| | d,e| [d, e]| e| +-------+---------+----+
- 利用RDD的flatMap实现。
对于更灵活的情况,您可以直接使用DataFrame的RDD接口,通过flatMap操作来实现转换。
样例(以将输入行拆分为多行为例):
from pyspark.sql import SparkSession sparkSession = SparkSession.builder.appName("SimulateUDTFWithFlatMap").getOrCreate() data = [("a,b,c",), ("d,e",)] df = spark.createDataFrame(data, ["csv_col"]) rdd = df.rdd.flatMap(lambda row: [(row.csv_col, token) for token in row.csv_col.split(",")]) new_df = rdd.toDF(["csv_col", "word"]) new_df.show() sparkSession.stop()运行结果:
+-------+----+ |csv_col|word| +-------+----+ | a,b,c| a| | a,b,c| b| | a,b,c| c| | d,e| d| | d,e| e| +-------+----+