Spark Application Development Suggestions
Persist the RDD that will be frequently used
The default RDD storage level is StorageLevel.NONE, which means that the RDD is not stored on disks or in memory. If an RDD is frequently used, persist the RDD as follows:
Call cache(), persist(), or persist(newLevel: StorageLevel) of spark.RDD to persist the RDD. The cache() and persist() functions set the RDD storage level to StorageLevel.MEMORY_ONLY. The persist(newLevel: StorageLevel) function allows you to set other storage level for the RDD. However, before calling this function, ensure that the RDD storage level is StorageLevel.NONE or the same as the newLevel. That is, once the RDD storage level is set to a value other than StorageLevel.NONE, the storage level cannot be changed.
To unpersist an RDD, call unpersist(blocking: Boolean = true). The function can:
1. Remove the RDD from the persistence list. The corresponding RDD data becomes recyclable.
2. Set the storage level of the RDD to StorageLevel.NONE.
Carefully select the the shuffle operator
This type of operator features wide dependency. That is, a partition of the parent RDD affects multiple partitions of the child RDD. The elements in an RDD are <key, value> pairs. During the execution process, the partitions of the RDD will be sequenced again. This operation is called shuffle.
Network transmission between nodes is involved in the shuffle operators. Therefore, for an RDD with large data volume, you are advised to extract information as much as possible to minimize the size of each piece of data and then call the shuffle operators.
The following methods are often used:
- combineByKey() : RDD[(K, V)] => RDD[(K, C)]
This method is used to convert all the keys that have the same value in RDD[(K, V)] to a value with type of C.
- groupByKey() and reduceByKey() are two types of implementation of combineByKey. If groupByKey and reduceByKey cannot meet requirements in complex data aggregation, you can use customized aggregation functions as the parameters of combineByKey.
- distinct(): RDD[T] => RDD[T]
This method is used to remove repeated elements. The code is as follows:
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
This process is time-consuming, especially when the data volume is high. Therefore, it is not recommended for the RDD generated from large files.
Use high-performance operators if the service permits
- Using reduceByKey/aggregateByKey to replace groupByKey
The map-side pre-aggregation refers to that each local node performs the aggregation operation on the same key, which is similar to the local combiner in MapReduce. The map-side pre-aggregation ensures that each key on a node is unique. When a node is collecting the data of the same key in the processing results of the previous nodes, data that needs to be obtained will be significantly reduced, decreasing disk I/O and Internet transmission cost. Generally speaking, it is advised to replace groupByKey operator with reduceByKey or aggregateByKey operator if possible because they will pre-aggregate the local same key on each node by using user-defined functions. However, the groupByKey operator does not support pre-aggregation and delivers lower performance than reduceByKey or aggregateByKey because all data are distributed and transmitted on all the nodes.
- Using mapPartitions to replace ordinary map operators
During a function invocation, mapPartitions operators will process all the data in a partition instead of only one piece of data, and therefore delivers higher performance than the ordinary map operators. However, mapPartitions may occasionally result in Out of Memory (OOM). If memory is insufficient, some objects cannot be recycled during memory recycling. Therefore, exercise caution when using mapPartitions.
- Performing the coalesce operation after filtering
After filtering a large portion of data (for example, above 30%) by using the filter operator in an RDD, you are advised to manually decrease the number of partitions by using coalesce in order to compress the data in RDD to fewer partitions. This is because after filtering, much data in each partition is filtered out, leaving little data to be processed. If the computing is continued, resources can be wasted. The task handling speed decreases as the number of tasks increases. Therefore, decreasing the number of partitions by using coalesce to compress the RDD data to fewer partitions can ensure that all the partitions are handled with fewer tasks. The performance can also be enhanced in some scenarios.
- Using repartitionAndSortWithinPartitions to replace repartition and sort
repartitionAndSortWithinPartitions is recommended by Spark official website. It is advised to use repartitionAndSortWithinPartitions for sorting after repartitioning. This operator can sort and shuffle repartitions at the same time, delivering higher performance.
- Using foreachPartitions to replace foreach
Similar to "Using mapPartitions to replace ordinary map operators", this mechanism handles all the data in a partition during a function invocation instead of one piece of data. In practice, foreachPartitions is proved to be helpful in improving performance. For example, the foreach function can be used to write all the data in RDD into MySQL. Ordinary foreach operators, write data piece by piece, and a database connection is established for each function invocation. Frequent connection establishments and destructions cause low performance. foreachPartitions, however, processes all the data in a partition at a time. Only one database connection is required for each partition. Batch insertion delivers higher performance.
RDD Shared Variables
In application development, when a function is transferred to a Spark operation(such as map and reduce) and runs on a remote cluster, the operation is actually performed on the independent copies of all the variables involved in the function. These variables will be copied to each machine. In general, reading and writing shared variables across tasks is apparently inefficient. Spark provides two shared variables that are commonly used: broadcast variable and accumulator.
Kryo can be used to optimize serialization performance in performance-demanding scenarios.
Spark offers two serializers:
org.apache.spark.serializer.KryoSerializer: high-performance but low compatibility
org.apache.spark.serializer.JavaSerializer: average performance and high compatibility
Method: conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
The following are reasons why Spark does not use Kryo-based serialization by default:
Spark uses Java serialization by default, that is, uses the ObjectOutputStream and ObjectInputStream API to perform serialization and deserialization. Spark can also use Kryo serialization library, which delivers higher performance than Java serialization library. According to official statistics, Kryo-based serialization is 10 times more efficient than Java-based serialization. Kryo-based serialization requires the registration of all the user-defined types to be serialized, which is a burden for developers.
Suggestions on Optimizing Spark Streaming Performance
- Set an appropriate batch processing duration (batchDuration).
- Set concurrent data receiving appropriately.
- Set multiple receivers to receive data.
- Set an appropriate receiver congestion duration.
- Set concurrent data processing appropriately.
- Use Kryo-based serialization.
- Optimize memory.
- Set the persistence level to reduce GC costs.
- Use concurrent Mark Sweep GC algorithms to shorten GC pauses.
Suggestions for a Running PySpark
To run a PySpark application, you must install the Python environment and upload the necessary Python dependency package to the HDFS. The Python environment provided by the cluster cannot be used.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot