更新时间:2023-04-10 GMT+08:00

配置Spark SQL开启Adaptive Execution特性

配置场景

Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下:

  1. 自动设置shuffle partition数

    在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能合适。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。

  1. 动态调整执行计划

    在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。

  1. 自动处理数据倾斜

    在执行SQL语句时,若存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取若干个shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果

配置参数

登录FusionInsight Manager系统,选择“集群 > 服务 > Spark2x > 配置”,单击“全部配置”,搜索以下参数。

参数

说明

默认值

spark.sql.adaptive.enabled

配置是否启用自适应执行功能。

注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。

false

spark.sql.optimizer.dynamicPartitionPruning.enabled

动态分区裁剪功能的开关。

true

spark.sql.adaptive.coalescePartitions.enabled

如果配置为true并且“spark.sql.adaptive.enabled”为true,Spark将根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)合并连续的随机播放分区,以避免执行过多的小任务。

true

spark.sql.adaptive.coalescePartitions.initialPartitionNum

合并之前的shuffle分区的初始数量,默认等于spark.sql.shuffle.partitions。只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,该配置才有效。创建时可选,初始分区数必须为正数。

200

spark.sql.adaptive.coalescePartitions.minPartitionNum

合并后的最小shuffle分区数。如果不设置,默认为Spark集群的默认并行度。只有当spark.sql.adaptive.enabled 和spark.sql.adaptive.coalescePartitions.enable都为true时,该配置才有效。创建时可选,最小分区数必须为正数。

1

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

shuffle后单个分区的目标大小,从Spark3.0开始不再支持。

64MB

spark.sql.adaptive.advisoryPartitionSizeInBytes

自适应优化时(spark.sql.adaptive.enabled为true时)shuffle分区的咨询大小(单位:字节),在Spark聚合小shuffle分区或拆分倾斜的shuffle分区时生效。

64MB

spark.sql.adaptive.fetchShuffleBlocksInBatch

是否批量取连续的shuffle块。对于同一个map任务,批量读取连续的shuffle块可以减少IO,提高性能,而不是逐个读取块。注意,只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,单次读取请求中存在多个连续块。这个特性还依赖于一个可重定位的序列化器,使用的级联支持编解码器和新版本的shuffle提取协议。

true

spark.sql.adaptive.localShuffleReader.enabled

当“true”且spark.sql.adaptive.enabled为“true”时,Spark在不需要进行shuffle分区时,会尝试使用本地shuffle reader读取shuffle数据,例如:将sort-merge join转换为broadcast-hash join后。

true

spark.sql.adaptive.skewJoin.enabled

当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能

true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区

5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes.

256MB

spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

两表进行join操作的时候,当非空分区比率低于此配置时,无论其大小如何,都不会被视为自适应执行中广播哈希连接的生成端。只有当spark.sql.adaptive.enabled为true时,此配置才有效。

0.2