文档首页 > > 开发指南> Spark应用开发> 开发规范> 规则

规则

分享
更新时间: 2019/10/30 GMT+08:00

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
//创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
//RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
//创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
//创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
//RDD操作时引入的类。
import org.apache.spark.SparkContext._
//创建SparkConf时引入的类。
import org.apache.spark.SparkConf

自己抛出的异常必须要填写详细的描述信息

说明:便于问题定位。

正确示例:

//抛出异常时,写出详细描述信息。
throw new  IOException("Writing data error! Data: " + data.toString());

错误示例:

throw new  IOException("Writing data error! ");

Java与Scala函数有区别,在编写应用时,需要弄清楚每个函数的功能

RDD是不可改变的,也就是说,RDD的元素对象是不能更改的,因此,在用Java和Scala编写需要弄清楚每个函数的功能。下面举个例子。

场景:现有用户位置数据,按照时间排序生成用户轨迹。在Scala中,按时间排序的代码如下。

/* 函数实现的功能是得到某个用户的位置轨迹。

* 参数trajectory:由两部分组成-用户名和位置点(时间,经度,维度)

private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
// 先将用户位置点按时间排序
val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);
…
}

若用java实现上述功能,则需要将对trajectory._2重新生成对象,而不能直接对trajectory._2进行排序操作。原因是,java不支持Collections.sort(trajectory._2)这个操作,是由于该操作改变了trajectory._2这个对象本身,这违背了RDD元素不可更改这条规则;而Scala代码之所以能够正常运行,是因为sortBy( )这个函数生成了一个新的对象,它并不对trajectory._2直接操作。下面分别列出java实现的正确示例和错误示例。

正确示例:

//将用户的位置点从新生成一个对象。
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
//对新对象进行排序。
Collections.sort(list);

错误示例:

//直接对用户位置点按照时间排序。
Collections.sort(trajectory._2);

集群模式下,应注意Driver和worker节点之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会想到这种做法,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在集群模式下,执行程序的jar包会被发送到每个Worker上执行。如果只在main函数的节点上全局变量改变了,而未传给执行任务的函数中,将会报空指针异常。

正确示例:

object Test
{
private var testArg: String = null;
def main(args: Array[String])
{
testArg = …;
val sc: SparkContext = new SparkContext(…);

sc.textFile(…)
.map(x => testFun(x, testArg));
}

private def testFun(line: String, testArg: String): String =
{
testArg.split(…);
return …;
}
}

错误示例:

//定义对象。
object Test
{
// 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
private var testArg: String = null;
//main函数
def main(args: Array[String])
{

testArg = …;
val sc: SparkContext = new SparkContext(…);

sc.textFile(…)
.map(x => testFun(x));
}

private def testFun(line: String): String =
{
testArg.split(...);
return …;
}
}

运行错误示例,在Spark的local模式下能正常运行,而在集群模式情况下,会在红色加粗代码处报错,提示空指针异常,这是由于在集群模式下,执行程序的jar包会被发送到每个Worker上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop().

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop().。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

图1 添加SparkContext.stop()和不添加的区别

多线程安全登录方式

如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。

login的代码样例:

private Boolean login(Configuration conf){
boolean flag = false;
UserGroupInformation.setConfiguration(conf);

try {
UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB));
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;

}

relogin的代码样例:

public Boolean relogin(){
boolean flag = false;
try {

UserGroupInformation.getLoginUser().reloginFromKeytab();
System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}

自建用户执行sparksql命令时,需赋予用户相应写权限

用户可通过系统提供的spark用户来进行sparksql操作,也可通过在MRS Manager界面新建用户来执行sparksql操作。

在通过自建用户执行sparksql相应写操作时,可通过为用户选择supergroup组,赋予用户SystemAdministrator角色等方式赋予用户相应写操作的权限;如果新建用户为hadoop组用户且没有赋予特定角色,在一些涉及到写入操作的场景下可能会因不具备相应操作权限引起异常。

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问