Spark Python API接口介绍
由于Spark开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的API。
Spark Core常用接口
Spark主要使用到如下这几个类:
- pyspark.SparkContext:是Spark的对外接口。负责向调用该类的python应用提供Spark的各种功能,如连接Spark集群、创建RDD、广播变量等。
- pyspark.SparkConf:Spark应用配置类。如设置应用名称,执行模式,executor内存等。
- pyspark.RDD(Resilient Distributed Dataset):用于在Spark应用程序中定义RDD的类,该类提供数据集的操作方法,如map,filter。
- pyspark.Broadcast:广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份复制。
- pyspark.StorageLevel: 数据存储级别。有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。
- pyspark.sql.SQLContext:是SparkSQL功能的主入口。可用于创建DataFrame,注册DataFrame为一张表,表上执行SQL等。
- pyspark.sql.DataFrame:分布式数据集。DataFrame等效于SparkSQL中的关系表,可被SQLContext中的方法创建。
- pyspark.sql.DataFrameNaFunctions:DataFrame中处理数据缺失的函数。
- pyspark.sql.DataFrameStatFunctions:DataFrame中统计功能的函数,可以计算列之间的方差,样本协方差等。
方法 |
说明 |
---|---|
map(f, preservesPartitioning=False) |
对调用map的RDD数据集中的每个element都使用Func,生成新的RDD。 |
filter(f) |
对RDD中所有元素调用Func,生成将满足条件数据集以RDD形式返回。 |
flatMap(f, preservesPartitioning=False) |
先对RDD所有元素调用Func,然后将结果扁平化,生成新的RDD。 |
sample(withReplacement, fraction, seed=None) |
抽样,返回RDD一个子集。 |
union(rdds) |
返回一个新的RDD,包含源RDD和给定RDD的元素的集合。 |
distinct([numPartitions: Int]): RDD[T] |
去除重复元素,生成新的RDD。 |
groupByKey(): RDD[(K, Iterable[V])] |
返回(K,Iterable[V]),将key相同的value组成一个集合。 |
reduceByKey(func, numPartitions=None) |
对key相同的value调用Func。 |
sortByKey(ascending=True, numPartitions=None, keyfunc=function <lambda>) |
按照key来进行排序,是升序还是降序,ascending是boolean类型。 |
join(other, numPartitions) |
当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numPartitions为并发的任务数。 |
cogroup(other, numPartitions) |
将当有两个key-value对的dataset(K,V)和(K,W),返回的是(K, (Iterable[V], Iterable[W]))的dataset,numPartitions为并发的任务数。 |
cartesian(other) |
返回该RDD与其它RDD的笛卡尔积。 |
API |
说明 |
---|---|
reduce(f) |
对RDD中的元素调用Func。 |
collect() |
返回包含RDD中所有元素的一个数组。 |
count() |
返回的是dataset中的element的个数。 |
first() |
返回的是dataset中的第一个元素。 |
take(num) |
返回前num个elements。 |
takeSample(withReplacement, num, seed) |
takeSample(withReplacement,num,seed)对dataset随机抽样,返回由num个元素组成的数组。withReplacement表示是否使用replacement。 |
saveAsTextFile(path, compressionCodecClass) |
把dataset写到一个text file、HDFS或者HDFS支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。 |
saveAsSequenceFile(path, compressionCodecClass=None) |
只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统。 |
countByKey() |
对每个key出现的次数做统计。 |
foreach(func) |
在数据集的每一个元素上,运行函数。 |
countByValue() |
对RDD中每个不同value出现的次数进行统计。 |
Spark Streaming常用接口
Spark Streaming中常见的类有:
- pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。
- pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。
- dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。
对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。
Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。
方法 |
说明 |
---|---|
socketTextStream(hostname, port, storageLevel) |
从TCP源主机:端口创建一个输入流。 |
start() |
启动Spark Streaming计算。 |
awaitTermination(timeout) |
当前进程等待终止,如Ctrl+C等。 |
stop(stopSparkContext, stopGraceFully) |
终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接收到的数据处理完成。 |
UpdateStateByKey(func) |
更新DStream的状态。使用此方法,需要定义State和状态更新函数。 |
window(windowLength, slideInterval) |
根据源DStream的窗口批次计算得到一个新的DStream。 |
countByWindow(windowLength, slideInterval) |
返回流中滑动窗口元素的个数。 |
reduceByWindow(func, windowLength, slideInterval) |
当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 |
join(other,numPartitions) |
实现不同的Spark Streaming之间做合并操作。 |
SparkSQL常用接口
Spark SQL中在Python中重要的类有:
- pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。
- pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。
- pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。
- pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。
- pyspark.sql.functions:DataFrame中内嵌的函数。
- pyspark.sql.Window:sql中提供窗口功能。
方法 |
说明 |
---|---|
collect() |
返回一个数组,包含DataFrame的所有列。 |
count() |
返回DataFrame中的行数。 |
describe() |
计算统计信息,包含计数,平均值,标准差,最小值和最大值。 |
first() |
返回第一行。 |
head(n) |
返回前n行。 |
show() |
用表格形式显示DataFrame。 |
take(num) |
返回DataFrame中的前num行。 |
方法 |
说明 |
---|---|
explain() |
打印出SQL语句的逻辑计划和物理计划。 |
printSchema() |
打印schema信息到控制台。 |
registerTempTable(name) |
将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 |
toDF() |
返回一个列重命名的DataFrame。 |