更新时间:2024-09-18 GMT+08:00

优化数据倾斜场景下的Spark SQL性能

配置场景

在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其他分桶。最终导致部分Task过重,运行很慢;其他Task过轻,运行很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。

通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。

未产生倾斜的数据,将采用原有方式进行分桶并运行。

使用约束:

  • 只支持两表Join的场景。
  • 不支持FULL OUTER JOIN的数据倾斜处理。

    示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。

    select aid FROM a FULL OUTER JOIN b ON aid=bid;

  • 不支持LEFT OUTER JOIN的右表倾斜处理。

    示例:执行下面SQL语句,b表倾斜无法触发该优化。

    select aid FROM a LEFT OUTER JOIN b ON aid=bid;

  • 不支持RIGHT OUTER JOIN的左表倾斜处理。

    示例:执行下面SQL语句,a表倾斜无法触发该优化。

    select aid FROM a RIGHT OUTER JOIN b ON aid=bid;

配置描述

在Spark Driver端的“spark-defaults.conf”配置文件中添加如下表格中的参数。

表1 参数说明

参数

描述

默认值

spark.sql.adaptive.enabled

自适应执行特性的总开关。

注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。

false

spark.sql.optimizer.dynamicPartitionPruning.enabled

动态分区裁剪功能的开关。

true

spark.sql.adaptive.skewJoin.enabled

当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。

true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。

5

spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes

分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。

256MB

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

每个task处理的shuffle数据的最小数据量。单位:Byte。

67108864