常用参数
概述
本节介绍Spark使用过程中的常用配置项。以特性为基础划分子章节,以便用户快速搜索到相应的配置项。如果用户使用MRS集群,本节介绍的参数大部分已经适配好,用户无需再进行配置。少数需要用户根据实际场景配置的参数,请参见快速配置参数。
配置Stage失败重试次数
Spark任务在遇到FetchFailedException时会触发Stage重试。为了防止Stage无限重试,对Stage重试次数进行限制。重试次数可以根据实际需要进行调整。
在Spark客户端的“spark-defaults.conf”文件中配置如下参数。
| 
        参数  | 
      
        说明  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.stage.maxConsecutiveAttempts  | 
      
        Stage失败重试最大次数。  | 
      
        4  | 
     
配置是否使用笛卡尔积功能
要启动使用笛卡尔积功能,需要在Spark的“spark-defaults.conf”配置文件中进行如下设置。
| 
        参数  | 
      
        说明  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.sql.crossJoin.enabled  | 
      
        是否允许隐性执行笛卡尔积。 
  | 
      
        true  | 
     
 
   - JDBC应用在服务端的“spark-defaults.conf”配置文件中设置该参数。
 - Spark客户端提交的任务在客户端配的“spark-defaults.conf”配置文件中设置该参数。
 
Spark长时间任务安全认证配置
安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。
在客户端的“spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。
 
   当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。
| 
        参数名称  | 
      
        含义  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.kerberos.principal  | 
      
        具有Spark操作权限的principal。请联系MRS集群管理员获取对应principal。  | 
      
        -  | 
     
| 
        spark.kerberos.keytab  | 
      
        具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。  | 
      
        -  | 
     
| 
        spark.security.bigdata.loginOnce  | 
      
        Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。DataSight修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 
         说明: 
         当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。  | 
      
        true  | 
     
Python Spark
Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.python.profile  | 
      
        在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。  | 
      
        false  | 
     
| 
        spark.python.worker.memory  | 
      
        聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。  | 
      
        512m  | 
     
| 
        spark.python.worker.reuse  | 
      
        是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。  | 
      
        true  | 
     
Dynamic Allocation
动态资源调度是On Yarn模式特有的特性,并且必须开启Yarn External Shuffle才能使用这个功能。在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.dynamicAllocation.enabled  | 
      
        是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。注意目前仅在YARN模式下有效。 启用动态资源调度必须将spark.shuffle.service.enabled设置为true。以下配置也与此相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors。  | 
      |
| 
        spark.dynamicAllocation.minExecutors  | 
      
        最小Executor个数。  | 
      
        0  | 
     
| 
        spark.dynamicAllocation.initialExecutors  | 
      
        初始Executor个数。  | 
      
        spark.dynamicAllocation.minExecutors  | 
     
| 
        spark.dynamicAllocation.maxExecutors  | 
      
        最大executor个数。  | 
      
        2048  | 
     
| 
        spark.dynamicAllocation.schedulerBacklogTimeout  | 
      
        调度第一次超时时间。单位为秒。  | 
      
        1s  | 
     
| 
        spark.dynamicAllocation.sustainedSchedulerBacklogTimeout  | 
      
        调度第二次及之后超时时间。  | 
      
        1s  | 
     
| 
        spark.dynamicAllocation.executorIdleTimeout  | 
      
        普通Executor空闲超时时间。单位为秒。  | 
      
        60  | 
     
| 
        spark.dynamicAllocation.cachedExecutorIdleTimeout  | 
      
        含有cached blocks的Executor空闲超时时间。  | 
      
       
  | 
     
Spark Streaming
Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。
在Spark客户端的“spark-defaults.conf”文件中配置如下参数。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.streaming.receiver.writeAheadLog.enable  | 
      
        启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。  | 
      
        false  | 
     
| 
        spark.streaming.unpersist  | 
      
        由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。  | 
      
        true  | 
     
Spark Streaming Kafka
Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.streaming.kafka.maxRatePerPartition  | 
      
        使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。  | 
      
        -  | 
     
| 
        spark.streaming.blockInterval  | 
      
        在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。  | 
      
        200ms  | 
     
| 
        spark.streaming.receiver.maxRate  | 
      
        每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。  | 
      
        -  | 
     
| 
        spark.streaming.receiver.writeAheadLog.enable  | 
      
        是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。  | 
      
        false  | 
     
Netty/NIO及Hash/Sort配置
Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种Sort。网络也有两种方式,Netty和NIO。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.shuffle.manager  | 
      
        处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。  | 
      
        SORT  | 
     
| 
        spark.shuffle.consolidateFiles  | 
      
        (仅hash方式)若要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。  | 
      
        false  | 
     
| 
        spark.shuffle.sort.bypassMergeThreshold  | 
      
        该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。  | 
      
        200  | 
     
| 
        spark.shuffle.io.maxRetries  | 
      
        (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。  | 
      
        12  | 
     
| 
        spark.shuffle.io.numConnectionsPerPeer  | 
      
        (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。  | 
      
        1  | 
     
| 
        spark.shuffle.io.preferDirectBufs  | 
      
        (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内内存。  | 
      
        true  | 
     
| 
        spark.shuffle.io.retryWait  | 
      
        (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。  | 
      
        5  | 
     
普通Shuffle配置
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.shuffle.spill  | 
      
        若设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。  | 
      
        true  | 
     
| 
        spark.shuffle.spill.compress  | 
      
        是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。  | 
      
        true  | 
     
| 
        spark.shuffle.file.buffer  | 
      
        每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。  | 
      
        32KB  | 
     
| 
        spark.shuffle.compress  | 
      
        是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。  | 
      
        true  | 
     
| 
        spark.reducer.maxSizeInFlight  | 
      
        从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。  | 
      
        48MB  | 
     
Driver配置
Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置:
- JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。
 - ClassPath:包括Java类和Native的Lib加载路径。
 - Java Memory and Cores:Java进程的内存和CPU使用量。
 - Spark Configuration:Spark内部参数,与Java进程无关。
 
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.driver.extraJavaOptions  | 
      
        传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。  | 
      
        参考快速配置参数  | 
     
| 
        spark.driver.extraClassPath  | 
      
        附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。  | 
      
        参考快速配置参数  | 
     
| 
        spark.driver.userClassPathFirst  | 
      
        (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。  | 
      
        false  | 
     
| 
        spark.driver.extraLibraryPath  | 
      
        设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。  | 
      |
| 
        spark.driver.cores  | 
      
        驱动程序进程使用的核数。仅适用于Cluster模式。  | 
      
        1  | 
     
| 
        spark.driver.memory  | 
      
        驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。  | 
      
        4G  | 
     
| 
        spark.driver.maxResultSize  | 
      
        对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。  | 
      
        1G  | 
     
| 
        spark.driver.host  | 
      
        Driver监听的主机名或IP地址,用于Driver与Executor进行通信。  | 
      
        (local hostname)  | 
     
| 
        spark.driver.port  | 
      
        Driver监听的端口,用于Driver与Executor进行通信。  | 
      
        (random)  | 
     
ExecutorLaucher配置
ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.yarn.am.extraJavaOptions  | 
      
        在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。  | 
      
        参考快速配置参数  | 
     
| 
        spark.yarn.am.memory  | 
      
        针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。  | 
      
        1G  | 
     
| 
        spark.yarn.am.memoryOverhead  | 
      
        和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。  | 
      
        -  | 
     
| 
        spark.yarn.am.cores  | 
      
        针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。  | 
      
        1  | 
     
Executor配置
Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.executor.extraJavaOptions  | 
      
        传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。  | 
      
        参考快速配置参数  | 
     
| 
        spark.executor.extraClassPath  | 
      
        附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。  | 
      
        -  | 
     
| 
        spark.executor.extraLibraryPath  | 
      
        设置启动executor JVM时所使用的特殊的library path。  | 
      
        参考快速配置参数  | 
     
| 
        spark.executor.userClassPathFirst  | 
      
        (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。  | 
      
        false  | 
     
| 
        spark.executor.memory  | 
      
        每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。  | 
      
        4G  | 
     
| 
        spark.executorEnv.[EnvironmentVariableName]  | 
      
        添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。  | 
      
        -  | 
     
| 
        spark.executor.logs.rolling.maxRetainedFiles  | 
      
        设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。  | 
      
        -  | 
     
| 
        spark.executor.logs.rolling.size.maxBytes  | 
      
        设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。若要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。  | 
      
        -  | 
     
| 
        spark.executor.logs.rolling.strategy  | 
      
        设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。  | 
      
        -  | 
     
| 
        spark.executor.logs.rolling.time.interval  | 
      
        设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。若要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。  | 
      
        daily  | 
     
WebUI
WebUI展示了Spark应用运行的过程和状态。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.ui.killEnabled  | 
      
        允许停止Web UI中的stage和相应的job。 
         说明: 
         出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。  | 
      
        true  | 
     
| 
        spark.ui.port  | 
      
        应用程序dashboard的端口,显示内存和工作量数据。  | 
      
       
       
  | 
     
| 
        spark.ui.retainedJobs  | 
      
        在垃圾回收之前Spark UI和状态API记住的job数。  | 
      
        1000  | 
     
| 
        spark.ui.retainedStages  | 
      
        在垃圾回收之前Spark UI和状态API记住的stage数。  | 
      
        1000  | 
     
HistoryServer
HistoryServer读取文件系统中的EventLog文件,展示已经运行完成的Spark应用在运行时的状态信息。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.history.fs.logDirectory  | 
      
        History server的日志目录  | 
      
        -  | 
     
| 
        spark.history.ui.port  | 
      
        JobHistory侦听连接的端口。  | 
      
        18080  | 
     
| 
        spark.history.fs.updateInterval  | 
      
        History server所显示信息的更新周期,单位为秒。每次更新检查持久存储中针对事件日志进行的更改。  | 
      
        10s  | 
     
| 
        spark.history.fs.update.interval.seconds  | 
      
        每个事件日志更新检查的间隔。与spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。  | 
      
        10s  | 
     
| 
        spark.history.updateInterval  | 
      
        该配置项与spark.history.fs.update.interval.seconds和spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。  | 
      
        10s  | 
     
HistoryServer UI超时和最大访问数
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.session.maxAge  | 
      
        设置会话的超时时间,单位秒。此参数只适用于安全模式。普通模式下,无法设置此参数。  | 
      
        600  | 
     
| 
        spark.connection.maxRequest  | 
      
        设置客户端访问Jobhistory的最大并发数量。  | 
      
        5000  | 
     
EventLog
Spark应用在运行过程中,实时将运行状态以JSON格式写入文件系统,用于HistoryServer服务读取并重现应用运行时状态。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.eventLog.enabled  | 
      
        是否记录Spark事件,用于应用程序在完成后重构webUI。  | 
      
        true  | 
     
| 
        spark.eventLog.dir  | 
      
        如果spark.eventLog.enabled为true,记录Spark事件的目录。在此目录下,Spark为每个应用程序创建文件,并将应用程序的事件记录到文件中。用户也可设置为统一的与HDFS目录相似的地址,这样History server就可以读取历史文件。  | 
      
        hdfs://hacluster/spark2xJobHistory2x  | 
     
| 
        spark.eventLog.compress  | 
      
        spark.eventLog.enabled为true时,是否压缩记录的事件。  | 
      
        false  | 
     
EventLog的周期清理
JobHistory上的Event log是随每次任务的提交而累积的,任务提交的次数多了之后会造成太多文件的存放。Spark提供了周期清理Evnet log的功能,用户可以通过配置开关和相应的清理周期参数来进行控制。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.history.fs.cleaner.enabled  | 
      
        是否打开清理功能。  | 
      
        true  | 
     
| 
        spark.history.fs.cleaner.interval  | 
      
        清理功能的检查周期。  | 
      
        1d  | 
     
| 
        spark.history.fs.cleaner.maxAge  | 
      
        日志的最长保留时间。  | 
      
        4d  | 
     
Kryo
Kryo是一个非常高效的Java序列化框架,Spark中也默认集成了该框架。几乎所有的Spark性能调优都离不开将Spark默认的序列化器转化为Kryo序列化器的过程。目前Kryo序列化只支持Spark数据层面的序列化,还不支持闭包的序列化。设置Kryo序列元,需要将配置项“spark.serializer”设置为“org.apache.spark.serializer.KryoSerializer”,同时也搭配设置以下的配置项,优化Kryo序列化的性能。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.kryo.classesToRegister  | 
      
        使用Kryo序列化时,需要注册到Kryo的类名,多个类之间用逗号分隔。  | 
      
        -  | 
     
| 
        spark.kryo.referenceTracking  | 
      
        当使用Kryo序列化数据时,是否跟踪对同一个对象的引用情况。适用于对象图有循环引用或同一对象有多个副本的情况。否则可以设置为关闭以提升性能。  | 
      
        true  | 
     
| 
        spark.kryo.registrationRequired  | 
      
        是否需要使用Kryo来注册对象。当设为“true”时,如果序列化一个未使用Kryo注册的对象则会抛出异常。当设为“false”(默认值)时,Kryo会将未注册的类名称一同写到序列化对象中。该操作会带来大量性能开销,所以在用户还没有从注册队列中删除相应的类时应该开启该选项。  | 
      
        false  | 
     
| 
        spark.kryo.registrator  | 
      
        如果使用Kryo序列化,使用Kryo将该类注册至定制类。如果需要以定制方式注册类,例如指定一个自定义字段序列化器,可使用该属性。否则spark.kryo.classesToRegister会更简单。它应该设置为一个扩展KryoRegistrator的类。  | 
      
        -  | 
     
| 
        spark.kryoserializer.buffer.max  | 
      
        Kryo序列化缓冲区允许的最大值,单位为兆字节。这个值必须大于尝试序列化的对象。当在Kryo中遇到“buffer limit exceeded”异常时可以适当增大该值。也可以通过配置项spark.kryoserializer.buffer.max配置。  | 
      
        64MB  | 
     
| 
        spark.kryoserializer.buffer  | 
      
        Kryo序列化缓冲区的初始值,单位为兆字节。每个worker的每个核心都会有一个缓冲区。如果有需要,缓冲区会增大到spark.kryoserializer.buffer.max设置的值。也可以通过配置项spark.kryoserializer.buffer配置。  | 
      
        64KB  | 
     
Broadcast
Broadcast用于Spark进程间数据块的传输。Spark中无论Jar包、文件还是闭包以及返回的结果都会使用Broadcast。目前的Broadcast支持两种方式,Torrent与HTTP。前者将会把数据切成小片,分布到集群中,有需要时从远程获取;后者将文件存入到本地磁盘,有需要时通过HTTP方式将整个文件传输到远端。前者稳定性优于后者,因此Torrent为默认的Broadcast方式。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.broadcast.factory  | 
      
        使用的广播方式。  | 
      
        org.apache.spark.broadcast.TorrentBroadcastFactory  | 
     
| 
        spark.broadcast.blockSize  | 
      
        TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小可能会影响BlockManager的性能。  | 
      
        4096  | 
     
| 
        spark.broadcast.compress  | 
      
        在发送广播变量之前是否压缩。建议压缩。  | 
      
        true  | 
     
Storage
内存计算是Spark的最大亮点,Spark的Storage主要管理内存资源。Storage中主要存储RDD在Cache过程中产生的数据块。JVM中堆内存是整体的,因此在Spark的Storage管理中,“Storage Memory Size”变成了一个非常重要的概念。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.storage.memoryMapThreshold  | 
      
        超过该块大小的Block,Spark会对该磁盘文件进行内存映射。这可以防止Spark在内存映射时映射过小的块。一般情况下,对接近或低于操作系统的页大小的块进行内存映射会有高开销。  | 
      
        2m  | 
     
PORT
随机端口范围
所有随机端口必须在一定端口范围内。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.random.port.min  | 
      
        设置随机端口的最小值。  | 
      
        22600  | 
     
| 
        spark.random.port.max  | 
      
        设置随机端口的最大值。  | 
      
        22899  | 
     
TIMEOUT
Spark默认配置能很好的处理中等数据规模的计算任务,但一旦数据量过大,会经常出现超时导致任务失败的场景。在大数据量场景下,需调大Spark中的超时参数。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.files.fetchTimeout  | 
      
        获取通过驱动程序的SparkContext.addFile()添加的文件时的通信超时(秒)。  | 
      
        60s  | 
     
| 
        spark.network.timeout  | 
      
        所有网络交互的默认超时(秒)。如未配置,则使用该配置代替spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs或spark.shuffle.io.connectionTimeout。  | 
      
        360s  | 
     
| 
        spark.core.connection.ack.wait.timeout  | 
      
        连接时应答的超时时间(单位:秒)。为了避免由于GC带来的长时间等待,可以设置更大的值。  | 
      
        60  | 
     
加密
Spark支持Akka和HTTP(广播和文件服务器)协议的SSL,但WebUI和块转移服务仍不支持SSL。
SSL必须在每个节点上配置,并使用特殊协议为通信涉及到的每个组件进行配置。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.ssl.enabled  | 
      
        是否在所有被支持协议上开启SSL连接。 与spark.ssl.xxx类似的所有SSL设置指示了所有被支持协议的全局配置。为了覆盖特殊协议的全局配置,在协议指定的命名空间中必须重写属性。 使用“spark.ssl.YYY.XXX”设置覆盖由YYY指示的特殊协议的全局配置。目前YYY可以是基于Akka连接的akka或广播与文件服务器的fs。  | 
      
        false  | 
     
| 
        spark.ssl.enabledAlgorithms  | 
      
        以逗号分隔的密码列表。指定的密码必须被JVM支持。  | 
      
        -  | 
     
| 
        spark.ssl.keyPassword  | 
      
        key-store的私人密钥密码。  | 
      
        -  | 
     
| 
        spark.ssl.keyStore  | 
      
        key-store文件的路径。该路径可以绝对或相对于开启组件的目录。  | 
      
        -  | 
     
| 
        spark.ssl.keyStorePassword  | 
      
        key-store的密码。  | 
      
        -  | 
     
| 
        spark.ssl.protocol  | 
      
        协议名。该协议必须被JVM支持。本页所有协议的参考表。  | 
      
        -  | 
     
| 
        spark.ssl.trustStore  | 
      
        trust-store文件的路径。该路径可以绝对或相对于开启组件的目录。  | 
      
        -  | 
     
| 
        spark.ssl.trustStorePassword  | 
      
        trust-store的密码。  | 
      
        -  | 
     
安全性
Spark目前支持通过共享密钥认证。可以通过spark.authenticate配置参数配置认证。该参数控制Spark通信协议是否使用共享密钥执行认证。该认证是确保双边都有相同的共享密钥并被允许通信的基本握手。如果共享密钥不同,通信将不被允许。共享密钥通过如下方式创建:
- 对于YARN部署的Spark,将spark.authenticate配置为真会自动处理生成和分发共享密钥。每个应用程序会独占一个共享密钥。
 - 对于其他类型部署的Spark,应该在每个节点上配置Spark参数spark.authenticate.secret。所有Master/Workers和应用程序都将使用该密钥。
 
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.acls.enable  | 
      
        是否开启Spark acls。如果开启,它将检查用户是否有访问和修改job的权限。请注意这要求用户可以被识别。如果用户被识别为无效,检查将不被执行。UI可以使用过滤器认证和设置用户。  | 
      
        true  | 
     
| 
        spark.admin.acls  | 
      
        逗号分隔的有权限访问和修改所有Spark job的用户/管理员列表。如果在共享集群上运行并且工作时有MRS集群管理员或开发人员帮助调试,可以使用该列表。  | 
      
        admin  | 
     
| 
        spark.authenticate  | 
      
        是否Spark认证其内部连接。如果不是运行在YARN上,请参见spark.authenticate.secret。  | 
      
        true  | 
     
| 
        spark.authenticate.secret  | 
      
        设置Spark各组件之间验证的密钥。如果不是运行在YARN上且认证未开启,需要设置该项。  | 
      
        -  | 
     
| 
        spark.modify.acls  | 
      
        逗号分隔的有权限修改Spark job的用户列表。默认情况下只有开启Spark job的用户才有修改列表的权限(例如删除列表)。  | 
      
        -  | 
     
| 
        spark.ui.view.acls  | 
      
        逗号分隔的有权限访问Spark web ui的用户列表。默认情况下只有开启Spark job的用户才有访问权限。  | 
      
        -  | 
     
开启Spark进程间的认证机制
目前Spark进程间支持共享密钥方式的认证机制,通过配置spark.authenticate可以控制Spark在通信过程中是否做认证。这种认证方式只是通过简单的握手来确定通信双方享有共同的密钥。
在Spark客户端的“spark-defaults.conf”文件中配置如下参数。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.authenticate  | 
      
        在Spark on YARN模式下,将该参数配置成true即可。密钥的生成和分发过程是自动完成的,并且每个应用独占一个密钥。  | 
      
        true  | 
     
Compression
数据压缩是一个以CPU换内存的优化策略,因此当Spark内存严重不足的时候(由于内存计算的特质,这种情况非常常见),使用压缩可以大幅提高性能。目前Spark支持三种压缩算法:snappy,lz4,lzf。Snappy为默认压缩算法,并且调用native方法进行压缩与解压缩,在Yarn模式下需要注意堆外内存对Container进程的影响。
| 
        参数  | 
      
        描述  | 
      
        默认值  | 
     
|---|---|---|
| 
        spark.io.compression.codec  | 
      
        用于压缩内部数据的codec,例如RDD分区、广播变量和shuffle输出。默认情况下,Spark支持三种压缩算法:lz4,lzf和snappy。可以使用完全合格的类名称指定算法,例如org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec及org.apache.spark.io.SnappyCompressionCodec。  | 
      
        lz4  | 
     
| 
        spark.io.compression.lz4.block.size  | 
      
        当使用LZ4压缩算法时LZ4压缩中使用的块大小(字节)。当使用LZ4时降低块大小同样也会降低shuffle内存使用。  | 
      
        32768  | 
     
| 
        spark.io.compression.snappy.block.size  | 
      
        当使用Snappy压缩算法时Snappy压缩中使用的块大小(字节)。当使用Snappy时降低块大小同样也会降低shuffle内存使用。  | 
      
        32768  | 
     
| 
        spark.shuffle.compress  | 
      
        是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。  | 
      
        true  | 
     
| 
        spark.shuffle.spill.compress  | 
      
        是否压缩在shuffle期间溢出的数据。使用spark.io.compression.codec进行压缩。  | 
      
        true  | 
     
| 
        spark.eventLog.compress  | 
      
        设置当spark.eventLog.enabled设置为true时是否压缩记录的事件。  | 
      
        false  | 
     
| 
        spark.broadcast.compress  | 
      
        在发送之前是否压缩广播变量。建议压缩。  | 
      
        true  | 
     
| 
        spark.rdd.compress  | 
      
        是否压缩序列化的RDD分区(例如StorageLevel.MEMORY_ONLY_SER的分区)。牺牲部分额外CPU的时间可以节省大量空间。  | 
      
        false  | 
     
在资源不足的情况下,降低客户端运行异常概率
在资源不足的情况下,Application Master会因等待资源出现超时,导致任务被删除。调整如下参数,降低客户端应用运行异常概率。
在客户端的“spark-defaults.conf”配置文件中调整如下参数。
| 
       参数  | 
     
       说明  | 
     
       默认值  | 
    
|---|---|---|
| 
       spark.yarn.applicationMaster.waitTries  | 
     
       设置Application Master等待Spark master的次数,同时也是等待SparkContext初始化的次数。增大该参数值,可以防止AM任务被删除,降低客户端应用运行异常的概率。  | 
     
       10  | 
    
| 
       spark.yarn.am.memory  | 
     
       调整AM的内存。增大该参数值,可以防止AM因内存不足而被RM删除任务,降低客户端应用运行异常的概率。  | 
     
       1G  |