更新时间:2025-06-11 GMT+08:00

配置Spark SQL开启Adaptive Execution特性

操作场景

Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下:

  1. 自动设置shuffle partition数。

    在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能更优。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。

  1. 动态调整执行计划。

    在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。

  1. 自动处理数据倾斜。

    在执行SQL语句时,如果存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取部分shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果。

配置参数

  1. 登录FusionInsight Manager系统。

    详细操作请参考访问集群Manager

  2. 选择“集群 > 服务 > Spark2x/Spark > 配置”,单击“全部配置”,搜索并调整以下参数。

    参数

    说明

    取值示例

    spark.sql.adaptive.enabled

    配置是否启用自适应执行功能。

    • true:默认值,启用自适应执行功能。如果要开启,请同时关闭自动分区裁剪功能,对应参数:spark.sql.optimizer.dynamicPartitionPruning.enabled。

      注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。

    • false:关闭自适应执行功能。

    true

    spark.sql.optimizer.dynamicPartitionPruning.enabled

    用于控制是否启用动态分区裁剪(Dynamic Partition Pruning, DPP)优化。这个参数在处理涉及分区表的查询时非常有用,可以帮助提高查询性能。

    • true:默认值,启用动态分区裁剪优化。Spark会在执行查询时动态地裁剪不需要的分区,从而减少I/O操作和提高查询性能。
    • false:不启用动态分区裁剪优化。

    true

    spark.sql.adaptive.coalescePartitions.enabled

    是否启用自适应查询优化(Adaptive Query Execution, AQE)中的合并分区功能。

    • true:如果配置为true并且“spark.sql.adaptive.enabled”为true,Spark将根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)合并连续的随机播放分区,以避免执行过多的小任务。
    • false:不启用此优化。

    true

    spark.sql.adaptive.coalescePartitions.initialPartitionNum

    合并之前的shuffle分区的初始数量,默认等于spark.sql.shuffle.partitions。只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,该配置才有效。创建时可选,初始分区数必须为正数。

    200

    spark.sql.adaptive.coalescePartitions.minPartitionNum

    合并后的最小shuffle分区数。如果不设置,默认为Spark集群的默认并行度。只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enable都为true时,该配置才有效。创建时可选,最小分区数必须为正数。

    1

    spark.sql.adaptive.shuffle.targetPostShuffleInputSize

    shuffle后单个分区的目标大小,从Spark3.0开始不再支持。

    64MB

    spark.sql.adaptive.advisoryPartitionSizeInBytes

    自适应优化时(spark.sql.adaptive.enabled为true时)shuffle分区的咨询大小(单位:字节),在Spark聚合小shuffle分区或拆分倾斜的shuffle分区时生效。

    64MB

    spark.sql.adaptive.fetchShuffleBlocksInBatch

    是否批量取连续的shuffle块。对于同一个map任务,批量读取连续的shuffle块可以减少IO,提高性能,而不是逐个读取块。

    • true:默认值,即默认启用批处理模式。Spark会在执行Shuffle操作时尝试使用批处理模式来减少数据传输的开销,从而提高查询性能。
    • false:不启用此功能。

    注意:只有当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled都为true时,单次读取请求中存在多个连续块。这个特性还依赖于一个可重定位的序列化器,使用的级联支持编解码器和新版本的shuffle提取协议。

    true

    spark.sql.adaptive.localShuffleReader.enabled

    是否启用自适应查询优化(Adaptive Query Execution, AQE)中的本地Shuffle读取器。

    • true:设置为true,且spark.sql.adaptive.enabled为true时,Spark在不需要进行shuffle分区时,会尝试使用本地shuffle reader读取shuffle数据,例如:将sort-merge join转换为broadcast-hash join后。
    • false:不启用此功能。

    true

    spark.sql.adaptive.skewJoin.enabled

    是否启用运行时自动处理join运算中的数据倾斜功能。

    • true:默认值为true,即默认启用此优化。Spark会在执行连接操作时自动检测数据倾斜,并采取措施来减少倾斜对性能的影响。通过减少倾斜的影响,可以提高资源利用效率,减少内存和磁盘I/O开销。
    • false:不启用此功能。

    true

    spark.sql.adaptive.skewJoin.skewedPartitionFactor

    此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。

    5

    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。

    256MB

    spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

    两表进行join操作的时候,当非空分区比率低于此配置时,无论其大小如何,都不会被视为自适应执行中广播哈希连接的生成端。只有当spark.sql.adaptive.enabled为true时,此配置才有效。

    0.2

  3. 修改参数配置后,单击“保存”,根据界面提示操作后,等待配置保存成功。
  4. Spark服务端配置更新后,如果“配置状态”为“配置过期”,则需重启组件以使配置生效。

    图1 修改Spark配置

    在Spark服务概览页面,选择“更多 > 重启服务/滚动重启服务”,验证管理员密码后,等待服务重启成功。

    如果使用Spark客户端提交任务,修改了集群的参数后,需要重新下载客户端才能使配置生效,请参考使用MRS客户端

    组件重启期间将无法对外提供服务,可能会影响集群的上层业务正常运行,请在业务空闲期或确认操作无影响后再执行本操作。