文档首页 > > 开发指南> Spark应用开发> 开发规范> 建议

建议

分享
更新时间: 2019/04/30 GMT+08:00

RDD多次使用时,建议将RDD持久化

RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下:

调用spark.RDD中的cache()、persist()、persist(newLevel: StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel: StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。

如果想要将RDD去持久化,那么可以调用unpersist(blocking: Boolean = true),将该RDD从持久化列表中移除,并将RDD的存储级别重新设置为StorageLevel.NONE。

调用有shuffle过程的算子时,尽量提取需要使用的信息

该类算子称为宽依赖算子,其特点是父RDD的一个partition影响子RDD得多个partition,RDD中的元素一般都是<key, value>对。执行过程中都会涉及到RDD的partition重排,这个操作称为shuffle

由于shuffle类算子存在节点之间的网络传输,因此对于数据量很大的RDD,应该尽量提取需要使用的信息,减小其单条数据的大小,然后再调用shuffle类算子。

常用的有如下几种:

  • combineByKey() : RDD[(K, V)] => RDD[(K, C)],是将RDD[(K, V)]中key相同的数据的所有value转化成为一个类型为C的值。
  • groupByKey() 和reduceByKey()是combineByKey的两种具体实现,对于数据聚合比较复杂而groupByKey和reduceByKey不能满足使用需求的场景,可以使用自己定义的聚合函数作为combineByKey的参数来实现。
  • distinct(): RDD[T] => RDD[T],作用是去除重复元素的算子。其处理过程代码如下:

    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

    这个过程比较耗时,尤其是数据量很大时,建议不要直接对大文件生成的RDD使用。

  • join() : (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))],作用是将两个RDD通过key做连接。
  • 如果RDD[(K, V)]中某个key有X个value,而RDD[(K, W)]中相同key有Y个value,那么最终在RDD[(K, (V, W))]中会生成X*Y条记录。

在一条语句中如果连续调用多个方法,每个方法占用一行,以符号“.”对齐

这样做,可以增强代码可读性,明确代码执行流程。

代码示例:

val data: RDD[String] = sc.textFile(inputPath)
                             .map(line => (line.split(",")(0), line))
                             .groupByKey()
                             .collect()
                             .sortBy(pair => pair._1)
                             .flatMap(pair => pair._2);

异常捕获尽量不要直接catch (Exception ex),应该把异常细分处理

可以设计更合理异常处理分支。

明确方法功能,精确(而不是近似)地实现方法设计,一个函数仅完成一件功能,即使简单功能也编写方法实现

虽然为仅用一两行就可完成的功能去编方法好象没有必要,但用方法可使功能明确化,增加程序可读性,亦可方便维护、测试。下面举个例子说明。

不推荐的做法:

//该示例中map方法中,实现了对数据映射操作。
val data:RDD[(String, String)] = sc.textFile(input)
                                    .map(record => 
                                    {
                                      val elems = record.split(",");
                                      (elems(0) + "," + elems(1), elems(2));
                                    });  

推荐的做法:

//将上述示例中的map方法中的操作提取出来,增加了程序可读性,便于维护和测试。
val data:RDD[(String, String)] = sc.textFile(input)                                                      .map(record => deSomething(record));   

def deSomething(record: String): (String, String) =
{
  val elems = x.split(",");
  return (elems(0) + "," + elems(1), elems(2));
}
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问