Experience Summary
Using mapPartitions to Calculate Data by Partition
If the overhead of each record is significant, use the following example code.
rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}
You can then use mapPartitions to calculate data by partition.
rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close)
The mapPartitions function is a versatile tool for manipulating data. For example, when working with large data sets and needing to find the top N values, mapPartitions can calculate the top N values within each partition. If N is a small value, the top N values from all partitions can then be sorted. Compared with calculating top N with full data, this method has a higher efficiency.
Using coalesce to Adjust the Number of Slices
Use coalesce to adjust the number of slices. There are two coalesce functions:
coalesce(numPartitions: Int, shuffle: Boolean = false)
When shuffle is set to true, the function is the same as repartition(numPartitions:Int). Partitions are recreated using the shuffle. When shuffle is set to false, partitions of the parent resilient distributed datasets (RDD) are calculated in the same task. In this case, if the value of numPartitions is greater than the number of sections of the parent RDD, partitions will not be recreated.
The following scenario is encountered, you can choose the coalesce operator:
- If the previous operation involves a large number of filters, use coalesce to minimize the number of zero-loaded tasks. In coalesce(numPartitions, false), the value of numPartitions is less than the number of slices of the parent RDD.
- Use coalesce when the number of slices entered is too large to execute.
- Use coalesce when the programs are suspended in the shuffle operation because of a large number of tasks or limited Linux resources. In this case, use coalesce (numPartitions, true) to recreate partitions.
Configuring localDir
During the shuffle procedure of Spark, data needs to be written into local disks. The performance bottleneck of Spark is shuffle, and the bottleneck of shuffle is the I/O. To improve the I/O performance, you can configure multiple disks to implement concurrent data writing. If multiple disks are mounted to a node, configure a Spark localDir for each disk. This can effectively distribute shuffle files in multiple locations, improving disk I/O efficiency. The performance cannot be improved if a disk is configured with multiple directories.
Using the Collect Operation for Small Data
The collect operation does not apply to a large volume of data.
When the collect operation is performed, the executor data is sent to the driver. If the driver does not have sufficient memory, OutOfMemory occurs on the driver. Therefore, if the data volume is unknown, perform the saveAsTextFile operation to write data into HDFS. If the data volume is known and the driver has sufficient memory, perform the collect operation.
Using reduceByKey
The reduceByKey operator implements local aggregation on the Map side, which offers a smooth shuffle procedure. The groupByKey operator, however, does not perform aggregation on the Map side. Therefore, use reduceByKey if possible to avoid implementation modes like groupByKey().map(x=>(x._1,x._2.size)).
Broadcasting Map Instead of Arrays
If a table query is required for each record of data that is broadcast from the driver side, broadcast the data in the set/map structure instead of Iterator. The query speed of the set/map structure is approximately O(1), while that of Iterator is O(n).
Avoiding Data Skew
If data skew occurs (certain data volume is extremely large), the execution time of tasks is inconsistent even though no GC is performed.
- Redefine keys. Use keys of smaller granularity to optimize the task size.
- Modify the DOP.
Optimizing the Data Structure
- Store data by column. In this way, only the required columns are scanned when data is read.
- When using Hash Shuffle, set spark.shuffle.consolidateFiles to true to combine intermediate files of shuffle, minimize the number of shuffle files and file I/O operations, and improve performance. The number of final files is the number of reduce tasks.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot