Updated on 2022-11-18 GMT+08:00

Rules

Import the Spark class in Spark applications

  • Example in Java:
    //Class imported when SparkContext is created. 
    import org.apache.spark.api.java.JavaSparkContext
    //Class imported for the RDD operation. 
    import org.apache.spark.api.java.JavaRDD
    //Class imported when SparkConf is created.
    import org.apache.spark.SparkConf
  • Example in Scala:
    //Class imported when SparkContext is created. 
    import org.apache.spark.SparkContext
    //Class imported for the RDD operatin.
    import org.apache.spark.SparkContext._
    //Class imported when SparkConf is created.
    import org.apache.spark.SparkConf

Understand each Java and Scala function

The RDD cannot be changed, that is, the objects in the RDD are unchangeable. Therefore, you must understand each Java and Scala function.

For example, sort user position information by time. If Scala is used, the code for sorting information by time is as follows:

/* The function is used to obtain the position track of a user. 
 * The parameter trajectory: consists of - user name and position (time, longitude, latitude)
private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
    // Sort the user position by time.
    val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);
    ¡­
}

If Java is used, you must create an object for trajectory._2 instead of sorting trajectory._2. The reason is that Java does not support the Collections.sort(trajectory._2) operation. This operation changes the trajectory._2 object, which violates the rule that the RDD object cannot be changed. The Scala code can be executed successfully because the sortBy( ) function creates an object. The following gives examples of the code in Java:

Correct:

//Create an object for user position. 
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
//Sort new objects. 
Collections.sort(list);

Incorrect:

//Sort user positions.
Collections.sort(trajectory._2);

Pay attention to the parameter transfer between the Driver and Executor nodes in distributed cluster

When Spark is used for programming, certain code logic needs to be determined based on the parameter entered. Generally, the parameter is specified as a global variable and assigned a null value. The actual value is assigned before the SparkContext object is instantiated using the main function. However, in the distributed cluster mode, the jar package of the executable program will be sent to each Executor. If the global variable values are changed only for the nodes in the main function and are not sent to the functions executing tasks, an error of null pointer will be reported.

Correct:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = ¡­;
    val sc: SparkContext = new SparkContext(¡­);

    sc.textFile(¡­)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(¡­);
    return ¡­; 
  }
}

Incorrect:

//Define an object. 
object Test
{
  // Define a global variable and set it to null. Assign a value to this variable before the SparkContext object is instantiated using the main function.
  private var testArg: String = null;
  //main function
  def main(args: Array[String])
  {
    pair
    testArg = ¡­;
    val sc: SparkContext = new SparkContext(¡­);

    sc.textFile(¡­)
    .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return ¡­; 
  }
}

No error will be reported in the local mode of Spark. However, in the distributed cluster mode, an error of null pointer will be reported. In the cluster mode, the jar package of the executable program is sent to each Executor for running. When the testFun function is executed, the system queries the value of testArg from the memory. The value of testArg, however, is changed only when the nodes of the main function are started and other nodes are unaware of the change. Therefore, the value returned by the memory is null, which causes an error of null pointer.

SparkContext.stop must be added before an application program stops

When Spark is used in secondary development, SparkContext.stop() must be added before an application program stops.

When Java is used in application development, JavaSparkContext.stop() must be added before an application program stops.

When Scala is used in application development, SparkContext.stop() must be added before an application program stops.

The following use Scala as an example to describe correct and incorrect examples.

Correct:

//Submit a spark job.
val sc = new SparkContext(conf)

//Specific task
...

//The application program stops.
sc.stop()

Incorrect:

//Submit a spark job.
val sc = new SparkContext(conf)

//Specific task
...

If you do not add SparkContext.stop, the YARN page displays the failure information. In the same task, as shown in Figure 1, the first program does not add SparkContext.stop(), while the second program adds SparkContext.stop.

Figure 1 Difference when SparkContext.stop() is added

Appropriately plan the proportion of resources for AM

When there are many tasks and each task occupies few resources, the tasks may fail to start even if the cluster resources are sufficient and the tasks are submitted successfully. To address this issue, you can increase the value of Max AM Resource Percent.

Figure 2 Modify Max AM Resource Percent