Spark Java API接口介绍
由于Spark开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的开源API。
Spark Core常用接口
Spark主要使用到如下这几个类:
- JavaSparkContext:是Spark的对外接口,负责向调用该类的Java应用提供Spark的各种功能,如连接Spark集群,创建RDD,累积量和广播量等。它的作用相当于一个容器。
- SparkConf:Spark应用配置类,如设置应用名称,执行模式,executor内存等。
- JavaRDD:用于在java应用中定义JavaRDD的类,功能类似于scala中的RDD(Resilient Distributed Dataset)类。
- JavaPairRDD:表示key-value形式的JavaRDD类。提供的方法有groupByKey,reduceByKey等。
- Broadcast:广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。
- StorageLevel:数据存储级别。有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。
JavaRDD支持两种类型的操作:Transformation和Action,这两种类型的常用方法如表1和表2。
方法 |
说明 |
---|---|
<R> JavaRDD<R> map(Function<T,R> f) |
对RDD中的每个element使用Function。 |
JavaRDD<T> filter(Function<T,Boolean> f) |
对RDD中所有元素调用Function,返回为true的元素。 |
<U> JavaRDD<U> flatMap(FlatMapFunction<T,U> f) |
先对RDD所有元素调用Function,然后将结果扁平化。 |
JavaRDD<T> sample(boolean withReplacement, double fraction, long seed) |
抽样。 |
JavaRDD<T> distinct(int numPartitions) |
去除重复元素。 |
JavaPairRDD<K,Iterable<V>> groupByKey(int numPartitions) |
返回(K,Seq[V]),将key相同的value组成一个集合。 |
JavaPairRDD<K,V> reduceByKey(Function2<V,V,V> func, int numPartitions) |
对key相同的value调用Function。 |
JavaPairRDD<K,V> sortByKey(boolean ascending, int numPartitions) |
按照key来进行排序,ascending为true时是升序否则为降序。 |
JavaPairRDD<K,scala.Tuple2<V,W>> join(JavaPairRDD<K,W> other) |
当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数。 |
JavaPairRDD<K,scala.Tuple2<Iterable<V>,Iterable<W>>> cogroup(JavaPairRDD<K,W> other, int numPartitions) |
当有两个KV的dataset(K,V)和(K,W),返回的是<K,scala.Tuple2<Iterable<V>,Iterable<W>>>的dataset,numTasks为并发的任务数。 |
JavaPairRDD<T,U> cartesian(JavaRDDLike<U,?> other) |
返回该RDD与其它RDD的笛卡尔积。 |
方法 |
说明 |
---|---|
T reduce(Function2<T,T,T> f) |
对RDD中的元素调用Function2。 |
java.util.List<T> collect() |
返回包含RDD中所有元素的一个数组。 |
long count() |
返回的是dataset中的element的个数。 |
T first() |
返回的是dataset中的第一个元素。 |
java.util.List<T> take(int num) |
返回前n个elements。 |
java.util.List<T> takeSample(boolean withReplacement, int num, long seed) |
对dataset随机抽样,返回有num个元素组成的数组。withReplacement表示是否使用replacement。 |
void saveAsTextFile(String path, Class<? extends org.apache.hadoop.io.compress.CompressionCodec> codec) |
把dataset写到一个text file、hdfs、或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。 |
java.util.Map<K,Object> countByKey() |
对每个key出现的次数做统计。 |
void foreach(VoidFunction<T> f) |
在数据集的每一个元素上,运行函数func。 |
java.util.Map<T,Long> countByValue() |
对RDD中每个元素出现的次数进行统计。 |
Spark Streaming常用接口
Spark Streaming中常见的类有:
- JavaStreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。
- JavaDStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。
- JavaPairDStream:KV DStream的接口,提供reduceByKey和join等操作。
- JavaReceiverInputDStream<T>:定义任何从网络接受数据的输入流。
Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。
方法 |
说明 |
---|---|
JavaReceiverInputDStream<java.lang.String> socketStream(java.lang.String hostname,int port) |
创建一个输入流,通过TCP socket从对应的hostname和端口接受数据。接受的字节被解析为UTF8格式。默认的存储级别为Memory+Disk。 |
JavaDStream<java.lang.String> textFileStream(java.lang.String directory) |
入参directory为HDFS目录,该方法创建一个输入流检测可兼容Hadoop文件系统的新文件,并且读取为文本文件。 |
void start() |
启动Streaming计算。 |
void awaitTermination() |
当前进程等待终止,如Ctrl+C等。 |
void stop() |
终止Streaming计算。 |
<T> JavaDStream<T> transform(java.util.List<JavaDStream<?>> dstreams,Function2<java.util.List<JavaRDD<?>>,Time,JavaRDD<T>> transformFunc) |
对每个RDD进行function操作,得到一个新的DStream。这个函数中JavaRDDs的顺序和list中对应的DStreams保持一致。 |
<T> JavaDStream<T> union(JavaDStream<T> first,java.util.List<JavaDStream<T>> rest) |
从多个具备相同类型和滑动时间的DStream中创建统一的DStream。 |
方法 |
说明 |
---|---|
JAVADStreamKafkaWriter.writeToKafka() |
支持将DStream中的数据批量写入到Kafka。 |
JAVADStreamKafkaWriter.writeToKafkaBySingle() |
支持将DStream中的数据逐条写入到Kafka。 |
Spark SQL常用接口
Spark SQL中重要的类有:
- SQLContext:是Spark SQL功能和DataFrame的主入口。
- DataFrame:是一个以命名列方式组织的分布式数据集
- DataFrameReader:从外部存储系统加载DataFrame的接口。
- DataFrameStatFunctions:实现DataFrame的统计功能。
- UserDefinedFunction:用户自定义的函数。
常见的Actions方法有:
方法 |
说明 |
---|---|
Row[] collect() |
返回一个数组,包含DataFrame的所有列。 |
long count() |
返回DataFrame的行数。 |
DataFrame describe(java.lang.String... cols) |
计算统计信息,包含计数,平均值,标准差,最小值和最大值。 |
Row first() |
返回第一行。 |
Row[] head(int n) |
返回前n行。 |
void show() |
用表格形式显示DataFrame的前20行。 |
Row[] take(int n) |
返回DataFrame中的前n行。 |
方法 |
说明 |
---|---|
void explain(boolean extended) |
打印出SQL语句的逻辑计划和物理计划。 |
void printSchema() |
打印schema信息到控制台。 |
registerTempTable |
将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 |
DataFrame toDF(java.lang.String... colNames) |
返回一个列重命名的DataFrame。 |
DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) |
根据不同的列,按照升序或者降序排序。 |
GroupedData rollup(Column... cols) |
对当前的DataFrame特定列进行多维度的回滚操作。 |