更新时间:2023-04-12 GMT+08:00
分享

建议

RDD多次使用时,建议将RDD持久化

RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下:

调用spark.RDD中的cache()、persist()、persist(newLevel: StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel: StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。

如果想要将RDD去持久化,那么可以调用unpersist(blocking: Boolean = true),将该RDD从持久化列表中移除,并将RDD的存储级别重新设置为StorageLevel.NONE。

调用有shuffle过程的算子时,尽量提取需要使用的信息

该类算子称为宽依赖算子,其特点是父RDD的一个partition影响子RDD得多个partition,RDD中的元素一般都是<key, value>对。执行过程中都会涉及到RDD的partition重排,这个操作称为shuffle

由于shuffle类算子存在节点之间的网络传输,因此对于数据量很大的RDD,应该尽量提取需要使用的信息,减小其单条数据的大小,然后再调用shuffle类算子。

常用的有如下几种:

  • combineByKey() : RDD[(K, V)] => RDD[(K, C)],是将RDD[(K, V)]中key相同的数据的所有value转化成为一个类型为C的值。
  • groupByKey() 和reduceByKey()是combineByKey的两种具体实现,对于数据聚合比较复杂而groupByKey和reduceByKey不能满足使用需求的场景,可以使用自己定义的聚合函数作为combineByKey的参数来实现。
  • distinct(): RDD[T] => RDD[T],作用是去除重复元素的算子。其处理过程代码如下:

    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

    这个过程比较耗时,尤其是数据量很大时,建议不要直接对大文件生成的RDD使用。

  • join() : (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))],作用是将两个RDD通过key做连接。
  • 如果RDD[(K, V)]中某个key有X个value,而RDD[(K, W)]中相同key有Y个value,那么最终在RDD[(K, (V, W))]中会生成X*Y条记录。

在一条语句中如果连续调用多个方法,每个方法占用一行,以符号“.”对齐

这样做,可以增强代码可读性,明确代码执行流程。

代码示例:

val data: RDD[String] = sc.textFile(inputPath)
                             .map(line => (line.split(",")(0), line))
                             .groupByKey()
                             .collect()
                             .sortBy(pair => pair._1)
                             .flatMap(pair => pair._2);

异常捕获尽量不要直接catch (Exception ex),应该把异常细分处理

可以设计更合理异常处理分支。

明确方法功能,精确(而不是近似)地实现方法设计,一个函数仅完成一件功能,即使简单功能也编写方法实现

虽然为仅用一两行就可完成的功能去编方法好象没有必要,但用方法可使功能明确化,增加程序可读性,亦可方便维护、测试。下面举个例子说明。

不推荐的做法:

//该示例中map方法中,实现了对数据映射操作。
val data:RDD[(String, String)] = sc.textFile(input)
                                    .map(record => 
                                    {
                                      val elems = record.split(",");
                                      (elems(0) + "," + elems(1), elems(2));
                                    });  

推荐的做法:

//将上述示例中的map方法中的操作提取出来,增加了程序可读性,便于维护和测试。
val data:RDD[(String, String)] = sc.textFile(input)                                                      .map(record => deSomething(record));   

def deSomething(record: String): (String, String) =
{
  val elems = x.split(",");
  return (elems(0) + "," + elems(1), elems(2));
}

Spark SQL动态分区插入优化之Distributeby

如下SQL中,p1和p2是target表的分区字段,使用distribute by关键字来减少小文件的产生。

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2 

Spark程序以动态分区的方式写入Hive表时,会出现了大量的小文件,导致最后移动文件到hive表目录非常耗时,这是因为在Shuffle时Hive多个分区的数据随机落到Spark的多个Task中,此时Task与Hive分区数据的关系是多对多,即每个Task会包含多个分区的部分数据,每个Task中包含的每个分区的数据都很少,最终会导致Task写多个分区文件,每个分区文件都比较小。

为了减少小文件的数量,需要将数据按照分区字段进行Shuffle,将各个分区的数据尽量各自集中在一个Task,在Spark SQL中就是通过distribute by关键字来完成这个功能的。

当使用distribute by关键字在后出现了数据倾斜,即有的分区数据多,有的分区数据少,也会导致spark 作业整体耗时长。需要在distribute by后面增加随机数,例如:

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2, cast(rand() * N as int)

N值可以在文件数量和倾斜度之间做权衡。

在Spark SQL中使用动态分区写入数据,需要同时使用distribute bysort by才能有效减少小文件数量。

Spark SQL动态分区插入优化之sortby

如下SQL中,p1和p2是target表的分区字段,使用sort by关键字来减少小文件的产生。

insert overwrite table target partition(p1,p2)
select * from source
distribute by p1, p2
sort by p1,p2

经过动态分区Shuffle优化之后,每一个Hive分区的数据都会集中在一个Spark Task中,但是由于Hive分区的数量远远大于Task数量,所以一个Task中会包含多个Hive分区的数据,即Task与Hive分区的关系是一对多。

每一个Task会将其包含的数据按照行顺序写入文件,文件所在的分区由该行数据中的分区字段值决定。Task在写入第一行数据时会创建一个新文件,随后写的每行数据都会判断该行数据的分区字段值与上一行数据的分区字段值是否相同,如果不相同就会新建一个文件并将该行数据写入,否则将该行数据写入上一条数据所在的文件。因此在Task写动态分区数据时,相邻两行数据如果分区字段值相同,就会写入同一个文件,否则就会写入不同的文件。假设Task有N行数据,在最坏情况下,所有相邻数据的分区字段值都不相同,那么Task将会写N个文件,每个文件只有一行数据。

为了将一个Task中相同分区的数据集中在一起,减少Task写的文件数量,需要将数据按照分区字段进行排序。假设一个Task中包含M个分区数据,排序之后,一个Task中相同分区的数据就会相邻,最终一个Task只会写M个文件。在Spark SQL中增加sort by关键词可完成排序功能。

在Spark SQL中使用动态分区写入数据,需要同时使用distribute bysort by才能有效减少小文件数量。

Spark建议使用Commit V2算法

在Spark提交作业时配置参数spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2,使用commit v2算法,例如在DataArts Studio提交作业增加—conf配置项,值为spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2。

Spark默认采用Commit V1算法,该算法原理为:Task执行完毕后将数据写入了临时目录,Driver等待所有Task执行完毕后,串行的将每个Task的临时输出文件移动到最终的目录中。如果输出的小文件过多,Driver串行移动文件耗时会过长,最终导致Commit过程比较耗时。

将Commit算法修改为V2,使得每个Task在执行成功后将临时文件移动到最终目录,相当于Driver中串行的移动操作优化为Task并行的移动文件操作。V2算法的缺点在于Spark作业执行过程中,最终目录的文件是对外可见的,如果此时有其他程序读取了最终目录的数据,那么其他程序处理的数据出现不一致问题。

然而使用Spark写Hive表,Spark作业的最终目录也是一个临时目录,通过load操作将临时目录数据导入hive表,所以Hive表的目录才是真正的最终目录,外部作业是无法读取到中间临时生成目录,因此针对Spark写Hive场景推荐使用Commit V2算法。

批量删除分区策略

当批量删除分区时,例如删除一个月所有的分区数据,可以使用的方式有两种:

  • 把所有的分区列出来,然后批量删除。假设city的数量有100个,那么删除一个月的所有分区需要列出3100个分区,然后批量删除。
    alter table target drop partition(city=c1,date=p1), partition(city=c2,date=p2),…
  • 只列出一个月的时间分区,然后批量删除。这种方式只需要列出31个时间分区。
    alter table target drop partition(date=p1), partition(date=p2)

这两种方式都可以删除一个月的所有分区,但是第二种删除分区的方式性能高于第一种。这是因为在删除分区前会查询分区是否存在,第一种方式会调用3100次查询API,而第二种方式只需要查询31次。

分享:

    相关文档

    相关产品