Updated on 2022-06-01 GMT+08:00

Rules

Import the Spark class in Spark applications.

  • Example in Java:
// Class imported when SparkContext is created
import org.apache.spark.api.java.JavaSparkContext
// Class imported for the RDD operation
import org.apache.spark.api.java.JavaRDD
// Class imported when SparkConf is created
import org.apache.spark.SparkConf
  • Example in Scala:
// Class imported when SparkContext is created
import org.apache.spark.SparkContext
// Class imported for the RDD operation
import org.apache.spark.SparkContext._
// Class imported when SparkConf is created
import org.apache.spark.SparkConf

Provide detailed descriptions for exceptions thrown by you.

Note: This facilitates fault location.

Correct example:

// Provide detailed description when throwing an exception.
throw new  IOException("Writing data error! Data: " + data.toString());

Incorrect example:

throw new  IOException("Writing data error! ");

Pay attention to the parameter transfer between the Driver and Worker nodes in distributed cluster.

When Spark is used for programming, certain code logic needs to be determined based on the parameter entered. Generally, the parameter is specified as a global variable and assigned a null value. The actual value is assigned before the SparkContext object is instantiated using the main function. However, in the distributed cluster mode, the JAR file of the executable program will be sent to each Worker. If the global variable values are changed only for the nodes in the main function and are not sent to the functions executing tasks, an error of null pointer will be reported.

Correct example:

object Test
{
private var testArg: String = null;
def main(args: Array[String])
{
testArg = ...;
val sc: SparkContext = new SparkContext(...);

sc.textFile(...)
.map(x => testFun(x, testArg));
}

private def testFun(line: String, testArg: String): String =
{
testArg.split(...);
return ...;
}
}

Incorrect example:

//Define an object. 
object Test
{
// Define a global variable and set it to null. Assign a value to this variable before the SparkContext object is instantiated using the main function.
private var testArg: String = null;
//main function
def main(args: Array[String])
{

testArg = ...;
val sc: SparkContext = new SparkContext(...);

sc.textFile(...)
.map(x => testFun(x));
}

private def testFun(line: String): String =
{
testArg.split(...);
return ...;
}
}

No error will be reported in the local mode of Spark. However, in the distributed cluster mode, an error of null pointer will be reported. In the cluster mode, the JAR file of the executable program is sent to each Worker for running. When the testFun function is executed, the system queries the value of testArg from the memory. The value of testArg, however, is changed only when the nodes of the main function are started and other nodes are unaware of the change. Therefore, the value returned by the memory is null, which causes an error of null pointer.

SparkContext.stop must be added before an application program stops.

When Spark is used in secondary development, SparkContext.stop() must be added before an application program stops.

When Java is used in application development, JavaSparkContext.stop() must be added before an application program stops.

When Scala is used in application development, SparkContext.stop() must be added before an application program stops.

The following use Scala as an example to describe correct and incorrect examples.

Correct example:

//Submit a spark job.
val sc = new SparkContext(conf)

//Specific task
...

//The application program stops.
sc.stop()

Incorrect example:

//Submit a spark job.
val sc = new SparkContext(conf)

//Specific task
...

If you do not add SparkContext.stop, the page displays the failure information. In the same task, as shown in Figure 1, the first program does not add SparkContext.stop, while the second program adds SparkContext.stop().

Figure 1 Difference when SparkContext.stop() is added

Multithread security login mode

If multiple threads are performing login operations, the relogin mode must be used for the subsequent logins of all threads after the first successful login of an application.

Login sample code:

private Boolean login(Configuration conf){
boolean flag = false;
UserGroupInformation.setConfiguration(conf);

try {
UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB));
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;

}

Relogin sample code:

public Boolean relogin(){
boolean flag = false;
try {

UserGroupInformation.getLoginUser().reloginFromKeytab();
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}

If a created user is used to run the sparksql command, required write permission must be assigned to the user.

You can use the Spark user provided by the system to perform the sparksql operation, or create a user on MRS Manager to perform the sparksql operation.

When the sparksql write operation is performed by a customer-created user, you can assign the write permission to the user by selecting the supergroup group and assigning the SystemAdministrator role to the user. If a new user is a Hadoop group user and is not assigned a role, exceptions may occur in some scenarios involving write operations.