Improving Spark SQL Calculation Performance Under Data Skew
Scenarios
When multiple tables are joined in Spark SQL, skew occurs in join keys and the data volume in some Hash buckets is much higher than that in other buckets. As a result, some tasks are overloaded and run slowly while other tasks are light and run fast. Other tasks with a small amount of data are quickly completed, which frees many CPUs and results in a waste of CPU resources.
If the automatic data skew function is enabled, data that exceeds the bucketing threshold is bucketed. Multiple tasks proceed data in one bucket. Therefore, CPU usage is enhanced and the system performance is improved. Data that is not skewed will be bucketed and run in its original way.
Notes and Constraints
- Only the join between two tables is supported.
- FULL OUTER JOIN data does not support data skew.
For example, the following SQL statement indicates that the skew of table a or table b cannot trigger the optimization.
select aid FROM a FULL OUTER JOIN b ON aid=bid;
- LEFT OUTER JOIN data does not support the data skew of the right table.
For example, the following SQL statement indicates that the skew of table b cannot trigger the optimization.
select aid FROM a LEFT OUTER JOIN b ON aid=bid;
- RIGHT OUTER JOIN does not support the data skew of the left table.
For example, the following SQL statement indicates that the skew of table a cannot trigger the optimization.
select aid FROM a RIGHT OUTER JOIN b ON aid=bid;
Configuration Description
- Install the Spark client.
For details, see Installing a Client.
- Log in to the Spark client node as the client installation user.
Add the following parameters to the {Client installation directory}/Spark/spark/conf/spark-defaults.conf file on the Spark client.
Table 1 Parameter description Parameter
Description
Example Value
spark.sql.adaptive.enabled
Whether Adaptive Query Execution (AQE) is enabled. Enabling this function allows Spark to re-optimize the query plan during execution and can significantly enhance the performance of Spark SQL queries, especially for varying data skew or when dealing with inaccurate statistics.
Note: If AQE and Dynamic Partition Pruning (DPP) are enabled at the same time, DPP takes precedence over AQE during SparkSQL task execution. As a result, AQE does not take effect. The DPP in the cluster is enabled by default. Therefore, you need to disable it when enabling the AQE.
false
spark.sql.optimizer.dynamicPartitionPruning.enabled
Whether Dynamic Partition Pruning (DPP) is enabled. DPP is an optimization technique in Spark that aims to reduce the amount of data scanned during query execution by dynamically filtering partitions at runtime. Enabling DPP can significantly improve query performance.
true
spark.sql.adaptive.skewJoin.enabled
Whether Spark dynamically handles data skew during sort-merge join operations. Enabling this function allows Spark to automatically detect and mitigate skewed partitions during join operations. This dynamic handling helps prevent "straggler" tasks that take significantly longer to complete due to processing a disproportionately large amount of data.
Specifies whether to enable the function of automatic processing of the data skew in join operations. The function is enabled when this parameter is set to true and spark.sql.adaptive.enabled is set to true.
true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
This parameter is a multiplier used to determine whether a partition is a data skew partition. If the data size of a partition exceeds the value of this parameter multiplied by the median of the all partition sizes except this partition and exceeds the value of spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes, this partition is considered as a data skew partition.
5
spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes
If the partition size (unit: byte) is greater than the threshold as well as the product of the spark.sql.adaptive.skewJoin.skewedPartitionFactor value and the median partition size, skew occurs in the partition. Ideally, the value of this parameter should be greater than that of spark.sql.adaptive.advisoryPartitionSizeInBytes.
256 MB
spark.sql.adaptive.shuffle.targetPostShuffleInputSize
Minimum amount of shuffle data processed by each task. The unit is byte.
67108864
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot