更新时间:2022-07-19 GMT+08:00
分享

规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
//创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
//RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
//创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
//创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
//RDD操作时引入的类。
import org.apache.spark.SparkContext._
//创建SparkConf时引入的类。
import org.apache.spark.SparkConf

自己抛出的异常必须要填写详细的描述信息

说明:便于问题定位。

正确示例:

//抛出异常时,写出详细描述信息。
throw new  IOException("Writing data error! Data: " + data.toString());

错误示例:

throw new  IOException("Writing data error! ");

集群模式下,应注意Driver和worker节点之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会想到这种做法,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在集群模式下,执行程序的jar包会被发送到每个Worker上执行。如果只在main函数的节点上全局变量改变了,而未传给执行任务的函数中,将会报空指针异常。

正确示例:

object Test
{
private var testArg: String = null;
def main(args: Array[String])
{
testArg = …;
val sc: SparkContext = new SparkContext(…);

sc.textFile(…)
.map(x => testFun(x, testArg));
}

private def testFun(line: String, testArg: String): String =
{
testArg.split(…);
return …;
}
}

错误示例:

//定义对象。
object Test
{
// 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
private var testArg: String = null;
//main函数
def main(args: Array[String])
{

testArg = …;
val sc: SparkContext = new SparkContext(…);

sc.textFile(…)
.map(x => testFun(x));
}

private def testFun(line: String): String =
{
testArg.split(...);
return …;
}
}

运行错误示例,在Spark的local模式下能正常运行,而在集群模式情况下,会在红色加粗代码处报错,提示空指针异常,这是由于在集群模式下,执行程序的jar包会被发送到每个Worker上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop().

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop().。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

图1 添加SparkContext.stop()和不添加的区别

多线程安全登录方式

如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。

login的代码样例:

private Boolean login(Configuration conf){
boolean flag = false;
UserGroupInformation.setConfiguration(conf);

try {
UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB));
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;

}

relogin的代码样例:

public Boolean relogin(){
boolean flag = false;
try {

UserGroupInformation.getLoginUser().reloginFromKeytab();
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}

自建用户执行sparksql命令时,需赋予用户相应写权限

用户可通过系统提供的spark用户来进行sparksql操作,也可通过在MRS Manager界面新建用户来执行sparksql操作。

在通过自建用户执行sparksql相应写操作时,可通过为用户选择supergroup组,赋予用户SystemAdministrator角色等方式赋予用户相应写操作的权限;如果新建用户为hadoop组用户且没有赋予特定角色,在一些涉及到写入操作的场景下可能会因不具备相应操作权限引起异常。

分享:

    相关文档

    相关产品