配置Spark SQL开启Adaptive Execution特性
配置场景
Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下:
- 自动设置shuffle partition数
在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能最优。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。
配置参数
登录FusionInsight Manager系统,选择“集群 > 服务 > Spark > 配置”,单击“全部配置”,搜索以下参数。
参数 |
说明 |
默认值 |
---|---|---|
spark.sql.adaptive.enabled |
配置是否启用自适应执行功能。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。 |
false |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
动态分区裁剪功能的开关。 |
true |
spark.sql.adaptive.coalescePartitions.enabled |
如果配置为true并且“spark.sql.adaptive.enabled”为true,Spark将根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)合并连续的随机播放分区,以避免执行过多的小任务。 |
true |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
合并之前的shuffle分区的初始数量,默认等于spark.sql.shuffle.partitions。只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,该配置才有效。创建时可选,初始分区数必须为正数。 |
200 |
spark.sql.adaptive.coalescePartitions.minPartitionNum |
合并后的最小shuffle分区数。如果不设置,默认为Spark集群的默认并行度。只有当spark.sql.adaptive.enabled 和spark.sql.adaptive.coalescePartitions.enable都为true时,该配置才有效。创建时可选,最小分区数必须为正数。 |
1 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize |
shuffle后单个分区的目标大小,从Spark3.0开始不再支持。 |
64MB |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
自适应优化时(spark.sql.adaptive.enabled为true时)shuffle分区的咨询大小(单位:字节),在Spark聚合小shuffle分区或拆分倾斜的shuffle分区时生效。 |
64MB |
spark.sql.adaptive.fetchShuffleBlocksInBatch |
是否批量取连续的shuffle块。对于同一个map任务,批量读取连续的shuffle块可以减少IO,提高性能,而不是逐个读取块。注意,只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,单次读取请求中存在多个连续块。这个特性还依赖于一个可重定位的序列化器,使用的级联支持编解码器和新版本的shuffle提取协议。 |
true |
spark.sql.adaptive.localShuffleReader.enabled |
当“true”且spark.sql.adaptive.enabled为“true”时,Spark在不需要进行shuffle分区时,会尝试使用本地shuffle reader读取shuffle数据,例如:将sort-merge join转换为broadcast-hash join后。 |
true |
spark.sql.adaptive.skewJoin.enabled |
当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能 |
true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区 |
5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes. |
256MB |
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin |
两表进行join操作的时候,当非空分区比率低于此配置时,无论其大小如何,都不会被视为自适应执行中广播哈希连接的生成端。只有当spark.sql.adaptive.enabled为true时,此配置才有效。 |
0.2 |