更新时间:2024-05-28 GMT+08:00

Spark应用开发规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
    // 创建SparkContext时所需引入的类。
    import org.apache.spark.api.java.JavaSparkContext
    // RDD操作时引入的类。
    import org.apache.spark.api.java.JavaRDD
    // 创建SparkConf时引入的类。
    import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
    // 创建SparkContext时所需引入的类。
    import org.apache.spark.SparkContext
    // RDD操作时引入的类。
    import org.apache.spark.SparkContext._
    // 创建SparkConf时引入的类。
    import org.apache.spark.SparkConf

分布式模式下,应注意Driver和Executor之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。

正确示例:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(…);
    return …; 
  }
}

错误示例:

//定义对象。
object Test
{
  // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
  private var testArg: String = null;
  // main函数
  def main(args: Array[String])
  {
    
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
      .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return …; 
  }
}

运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,YARN界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

图1 添加SparkContext.stop()和不添加的区别

合理规划AM资源占比

任务数量较多且每个任务占用的资源较少时,可能会出现集群资源足够,提交的任务成功但是无法启动,此时可以提高AM的最大资源占比。

图2 修改AM最大资源百分比