Spark应用开发常用概念
基本概念
- RDD
即弹性分布数据集(Resilient Distributed Dataset),是Spark的核心概念。指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。
RDD的生成:
- 从HDFS输入创建,或从与Hadoop兼容的其他存储系统中输入创建。
- 从父RDD转换得到新RDD。
- 从数据集合转换而来,通过编码实现。
RDD的存储:
- 用户可以选择不同的存储级别缓存RDD以便重用(RDD有11种存储级别)。
- 当前RDD默认是存储于内存,但当内存不足时,RDD会溢出到磁盘中。
- Dependency(RDD的依赖)
RDD的依赖分别为:窄依赖和宽依赖。
图1 RDD的依赖
- 窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用。
- 宽依赖:指子RDD的分区依赖于父RDD的所有分区。
窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join是指同步多个并行任务的barrier):把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。
- Transformation和Action(RDD的操作)
对RDD的操作包含Transformation(返回值还是一个RDD)和Action(返回值不是一个RDD)两种。RDD的操作流程如图2所示。其中Transformation操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
RDD看起来与Scala集合类型没有太大差别,但数据和运行模型大相迥异。
val file = sc.textFile("hdfs://...") val errors = file.filter(_.contains("ERROR")) errors.cache() errors.count()
- textFile算子从HDFS读取日志文件,返回file(作为RDD)。
- filter算子筛出带“ERROR”的行,赋给errors(新RDD)。filter算子是一个Transformation操作。
- cache算子缓存下来以备未来使用。
- count算子返回errors的行数。count算子是一个Action操作。
Transformation操作可以分为如下几种类型:- 视RDD的元素为简单元素。
输入输出一对多,且结果RDD的分区结构不变,如flatMap(map后由一个元素变为一个包含多个元素的序列,然后展平为一个个的元素)。
输入输出一对一,但结果RDD的分区结构发生了变化,如union(两个RDD合为一个,分区数变为两个RDD分区数之和)、coalesce(分区减少)。
从输入中选择部分元素的算子,如filter、distinct(去除重复元素)、subtract(本RDD有、其他RDD无的元素留下来)和sample(采样)。
- 视RDD的元素为Key-Value对。
对单个RDD做一对一运算,如mapValues(保持源RDD的分区方式,这与map不同);
对单个RDD重排,如sort、partitionBy(实现一致性的分区划分,这个对数据本地性优化很重要);
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;
对两个RDD基于key进行join和重组,如join、cogroup。
后三种操作都涉及重排,称为shuffle类操作。
Action操作可以分为如下几种:
- 生成标量,如count(返回RDD中元素的个数)、reduce、fold/aggregate(返回几个标量)、take(返回前几个元素)。
- 生成Scala集合类型,如collect(把RDD中的所有元素导入Scala集合类型)、lookup(查找对应key的所有值)。
- 写入存储,如与前文textFile对应的saveAsTextFile。
- 还有一个检查点算子checkpoint。当Lineage特别长时(这在图计算中时常发生),出错时重新执行整个序列要很长时间,可以主动调用checkpoint把当前数据写入稳定存储,作为检查点。
- Shuffle
Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,每一条输出结果需要按key哈希,并且分发到对应的Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。
下图清晰地描述了MapReduce算法的整个流程。
图3 算法流程
概念上shuffle就是一个沟通数据连接的桥梁,实际上shuffle这一部分是如何实现的呢,下面就以Spark为例讲一下shuffle在Spark中的实现。
Shuffle操作将一个Spark的Job分成多个Stage,前面的stages会包括一个或多个ShuffleMapTasks,最后一个stage会包括一个或多个ResultTask。
- Spark Application的结构
Spark Application的结构可分为两部分:初始化SparkContext和主体程序。
- Spark shell命令
Spark基本shell命令,支持提交Spark应用。命令为:
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ ... # other options <application-jar> \ [application-arguments]
参数解释:
--class:Spark应用的类名。
--master:Spark用于所连接的master,如yarn-client,yarn-cluster等。
application-jar:Spark应用的jar包的路径。
application-arguments:提交Spark应用的所需要的参数(可以为空)。
- Spark JobHistory Server
用于监控正在运行的或者历史的Spark作业在Spark框架各个阶段的细节以及提供日志显示,帮助用户更细粒度地开发、配置和调优作业。
Spark SQL常用概念
DataFrame
DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的Data Frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者RDD。
Spark SQL的程序入口是SQLContext类(或其子类),创建SQLContext时需要一个SparkContext对象作为其构造参数。SQLContext其中一个子类是HiveContext,相较于其父类,HiveContext添加了HiveQL的parser、UDF以及读取存量Hive数据的功能等。但注意,HiveContext并不依赖运行时的Hive,只是依赖Hive的类库。
由SQLContext及其子类可以方便的创建SparkSQL中的基本数据集DataFrame,DataFrame向上提供多种多样的编程接口,向下兼容多种不同的数据源,例如Parquet、JSON、Hive数据、Database、HBase等,这些数据源都可以使用统一的语法来读取。
Spark Streaming常用概念
Dstream
DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。
DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD一个只读的、可分区的分布式数据集。
DStream中的每个RDD包含了一个区间的数据。如图4所示。
应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。