更新时间:2024-10-31 GMT+08:00

Spark Scala API接口介绍

由于Spark开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的API。

Spark Core常用接口

Spark主要使用到如下这几个类:

  • SparkContext:是Spark的对外接口,负责向调用该类的scala应用提供Spark的各种功能,如连接Spark集群,创建RDD等。
  • SparkConf:Spark应用配置类,如设置应用名称,执行模式,executor内存等。
  • RDD(Resilient Distributed Dataset):用于在Spark应用程序中定义RDD的类,该类提供数据集的操作方法,如map,filter。
  • PairRDDFunctions:为key-value对的RDD数据提供运算操作,如groupByKey。
  • Broadcast:广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份复制。
  • StorageLevel:数据存储级别。有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。
RDD上支持两种类型的操作:Transformation和Action,这两种类型的常用方法如表1表2所示。
表1 Transformation

方法

说明

map[U](f: (T) => U): RDD[U]

对调用map的RDD数据集中的每个element都使用f方法,生成新的RDD。

filter(f: (T) => Boolean): RDD[T]

对RDD中所有元素调用f方法,生成将满足条件数据集以RDD形式返回。

flatMap[U](f: (T) => TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]

先对RDD所有元素调用f方法,然后将结果扁平化,生成新的RDD。

sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

抽样,返回RDD一个子集。

union(other: RDD[T]): RDD[T]

返回一个新的RDD,包含源RDD和给定RDD的元素的集合。

distinct([numPartitions: Int]): RDD[T]

去除重复元素,生成新的RDD。

groupByKey(): RDD[(K, Iterable[V])]

返回(K,Iterable[V]),将key相同的value组成一个集合。

reduceByKey(func: (V, V) => V[, numPartitions: Int]): RDD[(K, V)]

对key相同的value调用func。

sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

按照key来进行排序,是升序还是降序,ascending是boolean类型。

join[W](other: RDD[(K, W)][, numPartitions: Int]): RDD[(K, (V, W))]

当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numPartitions为并发的任务数。

cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

将当有两个key-value对的dataset(K,V)和(K,W),返回的是(K, (Iterable[V], Iterable[W]))的dataset,numPartitions为并发的任务数。

cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

返回该RDD与其它RDD的笛卡尔积。

表2 Action

API

说明

reduce(f: (T, T) => T):

对RDD中的元素调用f。

collect(): Array[T]

返回包含RDD中所有元素的一个数组。

count(): Long

返回的是dataset中的element的个数。

first(): T

返回的是dataset中的第一个元素。

take(num: Int): Array[T]

返回前n个elements。

takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

takeSample(withReplacement,num,seed)对dataset随机抽样,返回由num个元素组成的数组。withReplacement表示是否使用replacement。

saveAsTextFile(path: String): Unit

把dataset写到一个text file、HDFS或者HDFS支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。

saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统。

countByKey(): Map[K, Long]

对每个key出现的次数做统计。

foreach(func: (T) => Unit): Unit

在数据集的每一个元素上,运行函数func。

countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

对RDD中每个元素出现的次数进行统计。

表3 Spark Core新增接口

API

说明

isSparkContextDown:AtomicBoolean

该接口可判断sparkContext是否已完全stop,初始值为false。

若接口值为true,则代表sparkContext已完全stop。

若接口值为false,则代表sparkContext没有完成stop。

例如:用户根据 sc.isSparkContextDown.get() == true 可判断sparkContext已完全stop。

Spark Streaming常用接口

Spark Streaming中常见的类有:

  • StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。
  • dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。
  • dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。

    对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。

Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。

表4 Spark Streaming方法介绍

方法

说明

socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

从TCP源主机:端口创建一个输入流。

start():Unit

启动Spark Streaming计算。

awaitTermination(timeout: long):Unit

当前进程等待终止,如Ctrl+C等。

stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

终止Spark Streaming计算。

transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T]

对每一个RDD应用function操作得到一个新的DStream。

UpdateStateByKey(func)

更新DStream的状态。使用此方法,需要定义状态和状态更新函数。

window(windowLength, slideInterval)

根据源DStream的窗口批次计算得到一个新的DStream。

countByWindow(windowLength, slideInterval)

返回流中滑动窗口元素的个数。

reduceByWindow(func, windowLength, slideInterval)

当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。

join(otherStream, [numTasks])

实现不同的Spark Streaming之间做合并操作。

DStreamKafkaWriter.writeToKafka()

支持将DStream中的数据批量写入到Kafka。

DStreamKafkaWriter.writeToKafkaBySingle()

支持将DStream中的数据逐条写入到Kafka。

表5 Spark Streaming增强特性接口

方法

说明

DStreamKafkaWriter.writeToKafka()

支持将DStream中的数据批量写入到Kafka。

DStreamKafkaWriter.writeToKafkaBySingle()

支持将DStream中的数据逐条写入到Kafka。

SparkSQL常用接口

Spark SQL中常用的类有:

  • SQLContext:是Spark SQL功能和DataFrame的主入口。
  • DataFrame:是一个以命名列方式组织的分布式数据集。
  • HiveContext:获取存储在Hive中数据的主入口。
表6 常用的Actions方法

方法

说明

collect(): Array[Row]

返回一个数组,包含DataFrame的所有列。

count(): Long

返回DataFrame中的行数。

describe(cols: String*): DataFrame

计算统计信息,包含计数,平均值,标准差,最小值和最大值。

first(): Row

返回第一行。

Head(n:Int): Row

返回前n行。

show(numRows: Int, truncate: Boolean): Unit

用表格形式显示DataFrame。

take(n:Int): Array[Row]

返回DataFrame中的前n行。

表7 基本的DataFrame Functions

方法

说明

explain(): Unit

打印出SQL语句的逻辑计划和物理计划。

printSchema(): Unit

打印schema信息到控制台。

registerTempTable(tableName: String): Unit

将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。

toDF(colNames: String*): DataFrame

返回一个列重命名的DataFrame。