更新时间:2023-03-17 GMT+08:00

经验总结

使用mapPartitions,按每个分区计算结果

如果每条记录的开销太大,例:

rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}

则可以使用MapPartitions,按每个分区计算结果,如

rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)

使用mapPartitions可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

使用coalesce调整分片的数量

coalesce可以调整分片的数量。coalesce函数有两个参数:

coalesce(numPartitions: Int, shuffle: Boolean = false)

当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。

遇到下列场景,可选择使用coalesce算子:

  • 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。
  • 当输入切片个数太大,导致程序无法正常运行时使用。
  • 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。

localDir配置

Spark的Shuffle过程需要写本地磁盘,Shuffle是Spark性能的瓶颈,I/O是Shuffle的瓶颈。配置多个磁盘则可以并行的把数据写入磁盘。如果节点中挂载多个磁盘,则在每个磁盘配置一个Spark的localDir,这将有效分散Shuffle文件的存放,提高磁盘I/O的效率。如果只有一个磁盘,配置了多个目录,性能提升效果不明显。

Collect小数据

大数据量不适用collect操作。

collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

使用reduceByKey

reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey().map(x=>(x._1,x._2.size))这类实现方式。

广播map代替数组

当每条记录需要查表,如果是Driver端用广播方式传递的数据,数据结构优先采用set/map而不是Iterator,因为Set/Map的查询速率接近O(1),而Iterator是O(n)。

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。

优化数据结构

  • 把数据按列存放,读取数据时就可以只扫描需要的列。
  • 使用Hash Shuffle时,通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能。最终文件数为reduce tasks数目。