优化数据倾斜场景下的Spark SQL性能
配置场景
在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其它分桶。最终导致部分Task过重,跑得很慢;其它Task过轻,跑得很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。
通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。
未产生倾斜的数据,将采用原有方式进行分桶并运行。
使用约束:
配置描述
在Spark Driver端的“spark-defaults.conf”配置文件中添加如下表格中的参数。
参数 |
描述 |
默认值 |
---|---|---|
spark.sql.adaptive.enabled |
自适应执行特性的总开关。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此我们开启AQE特性的同时,需要将DPP特性关闭。 |
false |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
动态分区裁剪功能的开关。 |
true |
spark.sql.adaptive.skewJoin.enabled |
当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。 |
true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区 |
5 |
spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes |
分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes. |
256MB |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize |
每个task处理的shuffle数据的最小数据量。单位:Byte。 |
67108864 |