Spark Scala API接口介绍
由于Spark开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的API。
Spark Core常用接口
Spark主要使用到如下这几个类:
- SparkContext:是Spark的对外接口,负责向调用该类的scala应用提供Spark的各种功能,如连接Spark集群,创建RDD等。
- SparkConf:Spark应用配置类,如设置应用名称,执行模式,executor内存等。
- RDD(Resilient Distributed Dataset):用于在Spark应用程序中定义RDD的类,该类提供数据集的操作方法,如map,filter。
- PairRDDFunctions:为key-value对的RDD数据提供运算操作,如groupByKey。
- Broadcast:广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份复制。
- StorageLevel:数据存储级别。有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。
方法 |
说明 |
---|---|
map[U](f: (T) => U): RDD[U] |
对调用map的RDD数据集中的每个element都使用f方法,生成新的RDD。 |
filter(f: (T) => Boolean): RDD[T] |
对RDD中所有元素调用f方法,生成将满足条件数据集以RDD形式返回。 |
flatMap[U](f: (T) => TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U] |
先对RDD所有元素调用f方法,然后将结果扁平化,生成新的RDD。 |
sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] |
抽样,返回RDD一个子集。 |
union(other: RDD[T]): RDD[T] |
返回一个新的RDD,包含源RDD和给定RDD的元素的集合。 |
distinct([numPartitions: Int]): RDD[T] |
去除重复元素,生成新的RDD。 |
groupByKey(): RDD[(K, Iterable[V])] |
返回(K,Iterable[V]),将key相同的value组成一个集合。 |
reduceByKey(func: (V, V) => V[, numPartitions: Int]): RDD[(K, V)] |
对key相同的value调用func。 |
sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] |
按照key来进行排序,是升序还是降序,ascending是boolean类型。 |
join[W](other: RDD[(K, W)][, numPartitions: Int]): RDD[(K, (V, W))] |
当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numPartitions为并发的任务数。 |
cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] |
将当有两个key-value对的dataset(K,V)和(K,W),返回的是(K, (Iterable[V], Iterable[W]))的dataset,numPartitions为并发的任务数。 |
cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] |
返回该RDD与其它RDD的笛卡尔积。 |
API |
说明 |
---|---|
reduce(f: (T, T) => T): |
对RDD中的元素调用f。 |
collect(): Array[T] |
返回包含RDD中所有元素的一个数组。 |
count(): Long |
返回的是dataset中的element的个数。 |
first(): T |
返回的是dataset中的第一个元素。 |
take(num: Int): Array[T] |
返回前n个elements。 |
takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] |
takeSample(withReplacement,num,seed)对dataset随机抽样,返回由num个元素组成的数组。withReplacement表示是否使用replacement。 |
saveAsTextFile(path: String): Unit |
把dataset写到一个text file、HDFS或者HDFS支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。 |
saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit |
只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统。 |
countByKey(): Map[K, Long] |
对每个key出现的次数做统计。 |
foreach(func: (T) => Unit): Unit |
在数据集的每一个元素上,运行函数func。 |
countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] |
对RDD中每个元素出现的次数进行统计。 |
API |
说明 |
---|---|
isSparkContextDown:AtomicBoolean |
该接口可判断sparkContext是否已完全stop,初始值为false。 若接口值为true,则代表sparkContext已完全stop。 若接口值为false,则代表sparkContext没有完成stop。 例如:用户根据 sc.isSparkContextDown.get() == true 可判断sparkContext已完全stop。 |
Spark Streaming常用接口
Spark Streaming中常见的类有:
- StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。
- dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。
- dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。
对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。
Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。
方法 |
说明 |
---|---|
socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] |
从TCP源主机:端口创建一个输入流。 |
start():Unit |
启动Spark Streaming计算。 |
awaitTermination(timeout: long):Unit |
当前进程等待终止,如Ctrl+C等。 |
stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit |
终止Spark Streaming计算。 |
transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] |
对每一个RDD应用function操作得到一个新的DStream。 |
UpdateStateByKey(func) |
更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 |
window(windowLength, slideInterval) |
根据源DStream的窗口批次计算得到一个新的DStream。 |
countByWindow(windowLength, slideInterval) |
返回流中滑动窗口元素的个数。 |
reduceByWindow(func, windowLength, slideInterval) |
当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 |
join(otherStream, [numTasks]) |
实现不同的Spark Streaming之间做合并操作。 |
DStreamKafkaWriter.writeToKafka() |
支持将DStream中的数据批量写入到Kafka。 |
DStreamKafkaWriter.writeToKafkaBySingle() |
支持将DStream中的数据逐条写入到Kafka。 |
方法 |
说明 |
---|---|
DStreamKafkaWriter.writeToKafka() |
支持将DStream中的数据批量写入到Kafka。 |
DStreamKafkaWriter.writeToKafkaBySingle() |
支持将DStream中的数据逐条写入到Kafka。 |
SparkSQL常用接口
Spark SQL中常用的类有:
- SQLContext:是Spark SQL功能和DataFrame的主入口。
- DataFrame:是一个以命名列方式组织的分布式数据集。
- HiveContext:获取存储在Hive中数据的主入口。
方法 |
说明 |
---|---|
collect(): Array[Row] |
返回一个数组,包含DataFrame的所有列。 |
count(): Long |
返回DataFrame中的行数。 |
describe(cols: String*): DataFrame |
计算统计信息,包含计数,平均值,标准差,最小值和最大值。 |
first(): Row |
返回第一行。 |
Head(n:Int): Row |
返回前n行。 |
show(numRows: Int, truncate: Boolean): Unit |
用表格形式显示DataFrame。 |
take(n:Int): Array[Row] |
返回DataFrame中的前n行。 |
方法 |
说明 |
---|---|
explain(): Unit |
打印出SQL语句的逻辑计划和物理计划。 |
printSchema(): Unit |
打印schema信息到控制台。 |
registerTempTable(tableName: String): Unit |
将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 |
toDF(colNames: String*): DataFrame |
返回一个列重命名的DataFrame。 |