Spark应用开发规则
Spark应用中,需引入Spark的类
- 对于Java开发语言,正确示例:
// 创建SparkContext时所需引入的类。 import org.apache.spark.api.java.JavaSparkContext // RDD操作时引入的类。 import org.apache.spark.api.java.JavaRDD // 创建SparkConf时引入的类。 import org.apache.spark.SparkConf
- 对于Scala开发语言,正确示例:
// 创建SparkContext时所需引入的类。 import org.apache.spark.SparkContext // RDD操作时引入的类。 import org.apache.spark.SparkContext._ // 创建SparkConf时引入的类。 import org.apache.spark.SparkConf
分布式模式下,应注意Driver和Executor之间的参数传递
在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。
正确示例:
object Test { private var testArg: String = null; def main(args: Array[String]) { testArg = …; val sc: SparkContext = new SparkContext(…); sc.textFile(…) .map(x => testFun(x, testArg)); } private def testFun(line: String, testArg: String): String = { testArg.split(…); return …; } }
错误示例:
//定义对象。 object Test { // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。 private var testArg: String = null; // main函数 def main(args: Array[String]) { testArg = …; val sc: SparkContext = new SparkContext(…); sc.textFile(…) .map(x => testFun(x)); } private def testFun(line: String): String = { testArg.split(...); return …; } }
运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。
应用程序结束之前必须调用SparkContext.stop
利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。
利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。
利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。
以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。
正确示例:
//提交spark作业 val sc = new SparkContext(conf) //具体的任务 ... //应用程序结束 sc.stop()
错误示例:
//提交spark作业 val sc = new SparkContext(conf) //具体的任务 ...
如果不添加SparkContext.stop,YARN界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。
合理规划AM资源占比
任务数量较多且每个任务占用的资源较少时,可能会出现集群资源足够,提交的任务成功但是无法启动,此时可以提高AM的最大资源占比。