更新时间:2025-06-10 GMT+08:00

优化数据倾斜场景下的Spark SQL性能

操作场景

在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其他分桶。最终导致部分Task过重,运行很慢;其他Task过轻,运行很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。

通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。未产生倾斜的数据,将采用原有方式进行分桶并运行。

约束与限制

  • 只支持两表Join的场景。
  • 不支持FULL OUTER JOIN的数据倾斜处理。

    示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。

    select aid FROM a FULL OUTER JOIN b ON aid=bid;
  • 不支持LEFT OUTER JOIN的右表倾斜处理。

    示例:执行下面SQL语句,b表倾斜无法触发该优化。

    select aid FROM a LEFT OUTER JOIN b ON aid=bid;
  • 不支持RIGHT OUTER JOIN的左表倾斜处理。

    示例:执行下面SQL语句,a表倾斜无法触发该优化。

    select aid FROM a RIGHT OUTER JOIN b ON aid=bid;

配置描述

  1. 安装Spark客户端。

    详细操作请参考安装MRS客户端

  2. 使用客户端安装用户登录Spark客户端节点。

    在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中添加如下表格中的参数。

    表1 参数说明

    参数

    参数说明

    取值示例

    spark.sql.adaptive.enabled

    用于启用或禁用自适应查询执行(Adaptive Query Execution,AQE)功能。该功能允许Spark在运行时动态调整查询计划,以应对数据倾斜、统计信息不准确等问题,从而提升查询性能。

    注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。

    false

    spark.sql.optimizer.dynamicPartitionPruning.enabled

    用于控制动态分区裁剪(Dynamic Partition Pruning,DPP) 的启用状态。该功能允许Spark在查询执行时根据运行时数据动态过滤分区,从而减少数据扫描量,提升查询性能。

    true

    spark.sql.adaptive.skewJoin.enabled

    用于控制自适应倾斜Join优化的启用状态。该功能允许Spark在运行时检测并自动处理数据倾斜问题,避免因个别Task处理大量数据而导致的长尾性能问题。

    当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。

    true

    spark.sql.adaptive.skewJoin.skewedPartitionFactor

    此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。

    5

    spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes

    分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。

    256MB

    spark.sql.adaptive.shuffle.targetPostShuffleInputSize

    每个Task处理的shuffle数据的最小数据量。单位:Byte。

    67108864