Updated on 2022-11-04 GMT+08:00

Suggestions

Persist the RDD that will be frequently used

The default RDD storage level is StorageLevel.NONE, which means that the RDD is not stored on disks or in memory. If an RDD is frequently used, persist the RDD as follows:

Call cache(), persist(), or persist(newLevel: StorageLevel) of spark.RDD to persist the RDD. The cache() and persist() functions set the RDD storage level to StorageLevel.MEMORY_ONLY. The persist(newLevel: StorageLevel) function allows you to set other storage level for the RDD. However, before calling this function, ensure that the RDD storage level is StorageLevel.NONE or the same as the newLevel. That is, once the RDD storage level is set to a value other than StorageLevel.NONE, the storage level cannot be changed.

To unpersist an RDD, call unpersist(blocking: Boolean = true). The function can remove the RDD from the persistence list, and set the storage level of RDD to StorageLevel.NONE.

Carefully select the shuffle operator

This type of operator features wide dependency. That is, a partition of the parent RDD affects multiple partitions of the child RDD. The elements in an RDD are <key, value> pairs. During the execution process, the partitions of the RDD will be sequenced again. This operation is called shuffle.

Network transmission between nodes is involved in the shuffle operators. Therefore, for an RDD with large data volume, you are advised to extract information as much as possible to minimize the size of each piece of data and then call the shuffle operators.

The following methods are often used:

  • combineByKey() : RDD[(K, V)] => RDD[(K, C)]. This method is used to convert all the keys that have the same value in RDD[(K, V)] to a value with type of C.
  • groupByKey() and reduceByKey() are two types of implementation of combineByKey. If groupByKey and reduceByKey cannot meet requirements in complex data aggregation, you can use customized aggregation functions as the parameters of combineByKey.
  • distinct(): RDD[T] => RDD[T]. This method is used to remove repeated elements. The code is as follows:

    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

    This process is time-consuming, especially when the data volume is high. Therefore, it is not recommended for the RDD generated from large files.

  • Join (): (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))]. This method is used to combine two RDDs through key.
  • If a key in RDD[(K, V)] has X values and the same key in RDD[(K, W)] has Y values, a total of (X * Y) data records will be generated in RDD[(K, (V, W))].

If multiple methods are called consecutively in a statement, each method occupies a line and is aligned with a period (.).

In this way, the code readability is enhanced and the code execution process is clear.

Sample code:

val data: RDD[String] = sc.textFile(inputPath)
                             .map(line => (line.split(",")(0), line))
                             .groupByKey()
                             .collect()
                             .sortBy(pair => pair._1)
                             .flatMap(pair => pair._2);

Do not directly use catch (Exception ex) to capture exceptions.

A more reasonable exception handling branch can be designed.

Define method functions and implement method design accurately (rather than approximately). A function implements only one function, even if simple functions are implemented by compilation.

Although it seems unnecessary to edit a function that can be completed by using only one or two lines, the method can be used to make the function clear, improve the readability of the program, and facilitate maintenance and testing. Examples are listed as follows:

Not recommended:

// In this example, the map method implements data mapping.
val data:RDD[(String, String)] = sc.textFile(input)
                                    .map(record => 
                                    {
                                      val elems = record.split(",");
                                      (elems(0) + "," + elems(1), elems(2));
                                    });  

Recommended:

// Extract the operations in the map method in the preceding example, which increases the readability of the program and facilitates maintenance and test.
val data:RDD[(String, String)] = sc.textFile(input)                                                      .map(record =&gt; deSomething(record));   

def deSomething(record: String): (String, String) =
{
  val elems = x.split(",");
  return (elems(0) + "," + elems(1), elems(2));
}

Insertion Optimization of Spark SQL Dynamic Partition: Distributeby

In the following SQL statement, p1 and p2 are the partition fields of the target table. The keyword distribute by is used to reduce the number of small files.

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2 

When the Spark program writes data to the Hive table in dynamic partitioning mode, a large number of small files are generated. As a result, it takes a long time to move the files to the Hive table directory. This is because data in multiple Hive partitions randomly falls into multiple Spark tasks during shuffle, in this case, the relationship between tasks and Hive partition data is many-to-many. That is, each task contains some data of multiple partitions, and each task contains little data of each partition. As a result, the task writes multiple partition files, and each partition file is small.

To reduce the number of small files, data needs to be shuffled based on partition fields. Data in each partition should be centralized in a task. In Spark SQL, the distribute by keyword is used to implement this function.

If data skew occurs after the distribute by keyword is used, that is, some partitions have more data than others. As a result, it takes a long time to submitting Spark jobs. Add a random number to the end of distribute by. For example:

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2, cast(rand() * N as int)

The value of N can be a tradeoff between the number of files and the data skew.

When dynamic partitions are used to write data in Spark SQL, distribute by and sort by must be used at the same time to effectively reduce the number of small files.

Insertion Optimization of Spark SQL Dynamic Partition: sortby

In the following SQL statement, p1 and p2 are the partition fields of the target table. The keyword sort by is used to reduce the number of small files.

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2
sort by p1,p2

After the dynamic partition shuffle is optimized, data of each Hive partition is centralized in a Spark task. However, the number of Hive partitions is much greater than that of tasks. Therefore, a task contains data of multiple Hive partitions. That is, the relationship between a task and Hive partitions is one-to-many.

Each task writes the data contained in the task to the file in the row sequence. The partition where the file is located is determined by the partition field value in the row data. When writing the first row of data, the task creates a new file. Then, the task checks whether the partition field value of this row of data is the same as that of the previous row of data. If they are different, the task creates a new file and writes the data of this row to the file, otherwise, the data is written to the file where the previous data is located. Therefore, when the task writes dynamic partition data, if the partition field values in two adjacent rows are the same, the data is written into the same file. Assume that a task has N rows of data. In the worst case, the partition field values of all adjacent data are different. In this case, the task writes N files, and each file contains only one row of data.

To centralize data in the same partition of a task and reduce the number of files written by the task, data needs to be sorted by partition field. Assume that a task contains data of M partitions. After the data is sorted, data of the same partition in a task is adjacent. Finally, the task writes only M files. The sort by keyword is added to Spark SQL to implement the sorting function.

When dynamic partitions are used to write data in Spark SQL, distribute by and sort by must be used at the same time to effectively reduce the number of small files.

Commit V2 Algorithm Recommended to Spark

Set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2 when submitting a Spark job using the Commit v2 algorithm. For example, add the —conf configuration item and set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2 when submitting a job on DataArts Studio.

Spark uses the Commit V1 algorithm by default. The principle of this algorithm is as follows: After a task is executed, data is written to a temporary directory. After all tasks are executed, driver moves the temporary output file of each task to the final directory in serial mode. If a large number of small files are generated, it takes a long time for the driver to move files in serial mode. As a result, the commit process is time-consuming.

The Commit algorithm is changed to V2. After each task is successfully executed, the temporary file is moved to the final directory. That is, the serial moving operation in the driver is optimized to the parallel file moving operation in the task. The disadvantage of the V2 algorithm is that the files in the final directory are visible to external systems during Spark job execution. If other programs read the data in the final directory, the data processed by other programs is inconsistent.

However, when Spark is used to write data to Hive tables, the final directory of Spark jobs is also a temporary directory. Data in the temporary directory is imported to Hive tables through the LOAD operation. Therefore, the directory of Hive tables is the final directory, and external jobs cannot read the temporary directory, therefore, the Commit V2 algorithm is recommended for the scenario where Spark writes data to Hive.

Deleting Partition Policies in Batches

When deleting partitions in batches, for example, deleting all partition data of a month, you can use either of the following methods:

  • List all partitions and delete them in batches. Assume that there are 100 cities. To delete all partitions of a month, you need to list 3,100 partitions and delete them in batches.
    alter table target drop partition(city=c1,date=p1), partition(city=c2,date=p2), ...
  • Only the time partitions of one month are listed, and then the time partitions are deleted in batches. In this mode, only 31 time partitions need to be listed.
    alter table target drop partition(date=p1), partition(date=p2)

You can use either of the two methods to delete all partitions generated within one month. However, the second method has higher performance than the first one. This is because the system checks whether partitions exist before deleting the partitions. In the first method, the query API is invoked for 3,100 times. In the second method, the query API is invoked only for 31 times.