更新时间:2024-11-06 GMT+08:00

Spark应用开发建议

RDD多次使用时,建议将RDD持久化

RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下:

调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。

如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下:

  1. 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态;
  2. 将RDD的存储级别重新设置为StorageLevel.NONE。

慎重选择shuffle过程的算子

该类算子称为宽依赖算子,其特点是父RDD的一个partition影响子RDD的多个partition,RDD中的元素一般都是<key, value>对。执行过程中都会涉及到RDD的partition重排,这个操作称为shuffle。

由于shuffle类算子存在节点之间的网络传输,因此对于数据量很大的RDD,应该尽量提取需要使用的信息,减小其单条数据的大小,然后再调用shuffle类算子。

常用的有如下几种:

  • combineByKey() : RDD[(K, V)] => RDD[(K, C)],是将RDD[(K, V)]中key相同的数据的所有value转化成为一个类型为C的值。
  • groupByKey() 和reduceByKey()是combineByKey的两种具体实现,对于数据聚合比较复杂而groupByKey和reduceByKey不能满足使用需求的场景,可以使用自己定义的聚合函数作为combineByKey的参数来实现。
  • distinct(): RDD[T] => RDD[T],作用是去除重复元素的算子。其处理过程代码如下:
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

    这个过程比较耗时,尤其是数据量很大时,建议不要直接对大文件生成的RDD使用。

  • join() : (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))],作用是将两个RDD通过key做连接。

    如果RDD[(K, V)]中某个key有X个value,而RDD[(K, W)]中相同key有Y个value,那么最终在RDD[(K, (V, W))]中会生成X*Y条记录。

在业务情况允许的情况下使用高性能算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey

    所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

  2. 使用mapPartitions替代普通map。

    mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

  3. 使用filter之后进行coalesce操作。

    通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

  4. 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。

    repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

  5. 使用foreachPartitions替代foreach。

    原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

RDD共享变量

在应用开发中,一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立复制。这些变量会被复制到每一台机器。通常看来,在任务之间中,读写共享变量显然不够高效。Spark为两种常见的使用模式,提供了两种有限的共享变量:广播变量、累加器。

在对性能要求比较高的场景下,可以使用Kryo优化序列化性能

Spark提供了两种序列化实现:

org.apache.spark.serializer.KryoSerializer:性能好,兼容性差

org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好

使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

为什么不默认使用Kryo序列化?

Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

Spark Streaming性能优化建议

  1. 设置合理的批处理时间(batchDuration)。
  2. 设置合理的数据接收并行度。
    • 设置多个Receiver接收数据。
    • 设置合理的Receiver阻塞时间。
  1. 设置合理的数据处理并行度。
  2. 使用Kryo系列化。
  3. 内存调优。
    • 设置持久化级别减少GC开销。
    • 使用并发的标记-清理GC算法减少GC暂停时间。

运行pyspark建议

运行pyspark应用时,不能使用集群自带的python环境,需要用户自行安装python环境,并将python相关依赖包打包上传到HDFS。