Updated on 2022-12-14 GMT+08:00

Common Parameters

Overview

This section describes common configuration items used in Spark. Subsections are divided by feature so that you can quickly find required configuration items. If you use MRS clusters, most parameters described in this section have been adapted and you do not need to configure them again. For details about the parameters that need to be configured based on the site requirements, see Configuring Parameters Rapidly.

Configuring the Number of Stage Retries

When FetchFailedException occurs in a Spark task, a stage retry is triggered. To prevent infinite stage retries, the number of stage retries is limited. The number of retry times can be adjusted based on the site requirements.

Configure the following parameters in the spark-defaults.conf file on the Spark client.

Table 1 Parameter description

Parameter

Description

Default Value

spark.stage.maxConsecutiveAttempts

Indicates the maximum number of stage retries.

4

Configuring Whether to Use Cartesian Product

To enable the Cartesian product function, configure the following parameter in the spark-defaults.conf configuration file of Spark.

Table 2 Cartesian product parameters

Parameter

Description

Default Value

spark.sql.crossJoin.enabled

Indicates whether to allow implicit Cartesian product execution.

  • true: Implicit Cartesian product execution is allowed.
  • false: Implicit Cartesian product execution is not allowed. In this case, only CROSS JOIN can be explicitly included in the query.

true

  • For JDBC applications, configure this parameter in the spark-defaults.conf configuration file of the server.
  • For tasks submitted by the Spark client, configure this parameter in the spark-defaults.conf configuration file of the client.

Configuring Security Authentication for Long-Time Spark Tasks

In security mode, if the kinit command is used for security authentication when the Spark CLI (such as spark-shell, spark-sql, or spark-submit) is used, the task fails due to authentication expiration when the task is running for a long time.

Set the following parameters in the spark-defaults.conf configuration file on the client. After the configuration is complete, run the Spark CLI again.

If this parameter is set to true, ensure that the values of keytab and principal in spark-defaults.conf and hive-site.xml are the same.

Table 3 Parameter description

Parameter

Description

Default Value

spark.kerberos.principal

Indicates the principal user who has the Spark operation permission. Contact the MRS cluster administrator to obtain the principal user.

-

spark.kerberos.keytab

Indicates the name and path of the keytab file used to configure Spark operation permissions. Contact the MRS cluster administrator to obtain the Keytab file.

-

spark.security.bigdata.loginOnce

Indicates whether the principal user logs in to the system only once. true: single login; false: multiple logins.

The difference between a single login and multiple logins is as follows: The Spark community uses the Kerberos user to log in to the system for multiple times. However, the TGT or token may expire, causing the application to fail to run for a long time. The Kerberos login mode of DataSight is modified to allow users to log in only once, which effectively resolves the expiration problem. The restrictions are as follows: The principal and keytab configuration items of Hive must be the same as those of Spark.

NOTE:

If this parameter is set to true, ensure that the values of keytab and principal in spark-defaults.conf and hive-site.xml are the same.

true

Python Spark

Python Spark is the third programming language of Spark except Scala and Java. Different from Java and Scala that run on the JVM platform, Python Spark has its own Python process as well as the JVM process. The following configuration items apply only to Python Spark scenarios. However, other configuration items can also take effect in Python Spark scenarios.

Table 4 Parameter description

Parameter

Description

Default Value

spark.python.profile

Indicates whether to enable profiling on the Python worker. Use sc.show_profiles() to display the analysis results or display the analysis results before the Driver exits. You can use sc.dump_profiles(path) to dump the results to a disk. If some analysis results have been manually displayed, they will not be automatically displayed before the driver exits.

By default, pyspark.profiler.BasicProfiler is used. You can transfer the specified profiler during SparkContext initialization to overwrite the default profiler.

false

spark.python.worker.memory

Indicates the memory size that can be used by each Python worker process during aggregation. The value format is the same as that of the specified JVM memory, for example, 512 MB and 2 GB. If the memory used by a process during aggregation exceeds the value of this parameter, data will be written to disks.

512m

spark.python.worker.reuse

Indicates whether to reuse Python workers. If the reuse function is enabled, a fixed number of Python workers will be reused by the next batch of submitted tasks instead of forking a Python process for each task. This function is useful in large-scale broadcasting because the data does not need to be transferred from the JVM to the Python workers again for the next batch of submitted tasks.

true

Dynamic Allocation

Dynamic resource scheduling is a unique feature of the On Yarn mode. This function can be used only after Yarn External Shuffle is enabled. When Spark is used as a resident service, dynamic resource scheduling greatly improves resource utilization. For example, the JDBCServer process does not accept JDBC requests in most of the time. Therefore, releasing resources in this period greatly reduces the waste of cluster resources.

Table 5 Parameter description

Parameter

Description

Default Value

spark.dynamicAllocation.enabled

Indicates whether to use dynamic resource scheduling, which is used to adjust the number of executors registered with the application according to scale. Currently, this parameter is valid only in Yarn mode.

To enable dynamic resource scheduling, set spark.shuffle.service.enabled to true. Related parameters are as follows: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors.

  • JDBCServer2x:

    true

  • SparkResource2x:

    false

spark.dynamicAllocation.minExecutors

Indicates the minimum number of executors.

0

spark.dynamicAllocation.initialExecutors

Indicates the number of initial executors.

spark.dynamicAllocation.minExecutors

spark.dynamicAllocation.maxExecutors

Indicates the maximum number of executors.

2048

spark.dynamicAllocation.schedulerBacklogTimeout

Indicates the first timeout period for scheduling. The unit is second.

1s

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

Indicates the second and later timeout interval for scheduling.

1s

spark.dynamicAllocation.executorIdleTimeout

Indicates the idle timeout interval for common executors. The unit is second.

60

spark.dynamicAllocation.cachedExecutorIdleTimeout

Indicates the idle timeout interval for executors with cached blocks.

  • JDBCServer2x: 2147483647s
  • IndexServer2x: 2147483647s
  • SparkResource2x: 120

Spark Streaming

Spark Streaming is a streaming data processing function provided by the Spark batch processing platform. It processes data input from external systems in mini-batch mode.

Configure the following parameters in the spark-defaults.conf file on the Spark client.

Table 6 Parameter description

Parameter

Description

Default Value

spark.streaming.receiver.writeAheadLog.enable

Indicates whether to enable the write-ahead log (WAL) function. After this function is enabled, all input data received by the receiver is saved in the WAL. WAL ensures that data can be restored if the driver program becomes faulty.

false

spark.streaming.unpersist

Determines whether to automatically remove RDDs generated and saved by Spark Streaming from the Spark memory. If this function is enabled, original data received by Spark Streaming is also automatically cleared. If this function is disabled, original data and RDDs cannot be automatically cleared. External applications can access the data in Streaming. This, however, occupies more Spark memory resources.

true

Spark Streaming Kafka

The receiver is an important component of Spark Streaming. It receives external data, encapsulates the data into blocks, and provides the blocks for Streaming to consume. The most common data source is Kafka. Spark Streaming integrates Kafka to ensure reliability and can directly use Kafka as the RDD input.

Table 7 Parameter description

Parameter

Description

Default Value

spark.streaming.kafka.maxRatePerPartition

Indicates the maximum rate (number of records per second) for reading data from each Kafka partition if the Kafka direct stream API is used.

-

spark.streaming.blockInterval

Indicates the interval (ms) for accumulating data received by a Spark Streaming receiver into a data block before the data is stored in Spark. A minimum value of 50 ms is recommended.

200ms

spark.streaming.receiver.maxRate

Indicates the maximum rate (number of records per second) for each receiver to receive data. The value 0 or a negative value indicates no limit to the rate.

-

spark.streaming.receiver.writeAheadLog.enable

Indicates whether to use ReliableKafkaReceiver. This receiver ensures the integrity of streaming data.

false

Netty/NIO and Hash/Sort Configuration

Shuffle is critical for big data processing, and the network is critical for the entire shuffle process. Currently, Spark supports two shuffle modes: hash and sort. There are two network modes: Netty and NIO.

Table 8 Parameter description

Parameter

Description

Default Value

spark.shuffle.manager

Indicates the data processing mode. There are two implementation modes: sort and hash. The sort shuffle has a higher memory utilization. It is the default option in Spark 1.2 and later versions.

SORT

spark.shuffle.consolidateFiles

(Only in hash mode) To merge intermediate files created during shuffle, set this parameter to true. Decreasing the number of files to be created can improve the processing performance of the file system and reduce risks. If the ext4 or xfs file system is used, you are advised to set this parameter to true. Due to file system restrictions, this setting on ext3 may reduce the processing performance of a server with more than eight cores.

false

spark.shuffle.sort.bypassMergeThreshold

This parameter is valid only when spark.shuffle.manager is set to sort. When Map aggregation is not performed and the number of partitions for Reduce tasks is less than or equal to the value of this parameter, do not merge and sort data to prevent performance deterioration caused by unnecessary sorting.

200

spark.shuffle.io.maxRetries

(Only in Netty mode) If this parameter is set to a non-zero value, fetch failures caused by I/O-related exceptions will be automatically retried. This retry logic helps the large shuffle keep stable when long GC pauses or intermittent network disconnections occur.

12

spark.shuffle.io.numConnectionsPerPeer

(Only in Netty mode) Connections between hosts are reused to reduce the number of connections between large clusters. For a cluster with many disks but a few hosts, this function may make concurrent requests unable to occupy all disks. Therefore, you can increase the value of this parameter.

1

spark.shuffle.io.preferDirectBufs

(Only in Netty mode) The off-heap buffer is used to reduce GC during shuffle and cache block transfer. In an environment where off-heap memory is strictly limited, you can disable it to force all applications from Netty to use heap memory.

true

spark.shuffle.io.retryWait

(Only in Netty mode) Specifies the duration for waiting for fetch retry, in seconds. The maximum delay caused by retry is maxRetries x retryWait. The default value is 15 seconds.

5

Common Shuffle Configuration

Table 9 Parameter description

Parameter

Description

Default Value

spark.shuffle.spill

If this parameter is set to true, data is overflowed to the disk to limit the memory usage during a Reduce task.

true

spark.shuffle.spill.compress

Indicates whether to compress the data overflowed during shuffle. The algorithm specified by spark.io.compression.codec is used for data compression.

true

spark.shuffle.file.buffer

Specifies the size of the memory buffer for storing output streams of each shuffle file, in KB. These buffers can reduce the number of disk seek and system calls during the creation of intermediate shuffle file streams. You can also set this parameter by setting spark.shuffle.file.buffer.kb.

32KB

spark.shuffle.compress

Indicates whether to compress the output files of a Map task. You are advised to compress the broadcast variables. using spark.io.compression.codec.

true

spark.reducer.maxSizeInFlight

Specifies the maximum output size of the Map task that fetches data from each Reduce task, in MB. Each output requires a buffer, which is the fixed memory overhead of each Reduce task. Therefore, keep the value small unless there is a large amount of memory. You can also set this parameter by setting spark.reducer.maxMbInFlight.

48MB

Driver Configuration

Spark driver can be considered as the client of Spark applications. All code parsing is completed in this process. Therefore, the parameters of this process are especially important. The following describes how to configure parameters for Spark driver.

  • JavaOptions: parameter following -D in the Java command, which can be obtained by System.getProperty
  • ClassPath: path for loading the Java classes and Native library
  • Java Memory and Cores: memory and CPU usage of the Java process
  • Spark Configuration: Spark internal parameter, which is irrelevant to the Java process
Table 10 Parameter description

Parameter

Description

Default Value

spark.driver.extraJavaOptions

Indicates a series of extra JVM options passed to the driver, for example, GC setting and logging.

Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter.

For details, see Configuring Parameters Rapidly.

spark.driver.extraClassPath

Indicates the extra class path entries attached to the class path of the driver.

Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter.

For details, see Configuring Parameters Rapidly.

spark.driver.userClassPathFirst

(Trial) Indicates whether to allow JAR files added by users to take precedence over Spark JAR files when classes are loaded in the driver. This feature can be used to mitigate conflicts between Spark dependencies and user dependencies. This feature is in the trial phase and is used only in cluster mode.

false

spark.driver.extraLibraryPath

Sets a special library path for starting the driver JVM.

Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter.

  • JDBCServer2x:

    ${SPARK_INSTALL_HOME}/spark/native

  • SparkResource2x:

    ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native

spark.driver.cores

Specifies the number of cores used by the driver process. This parameter is available only in cluster mode.

1

spark.driver.memory

Indicates the memory used by the driver process, that is, the memory used by the SparkContext initialization process (for example, 512 MB and 2 GB).

Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter.

4G

spark.driver.maxResultSize

Indicates the total size of serialization results of all partitions for each Spark action operation (for example, collect). The value must be at least 1 MB. If this parameter is set to 0, the size is not limited. If the total amount exceeds this limit, the task will be aborted. If the value is too large, the memory of the driver may be insufficient (depending on the object memory overhead of spark.driver.memory and JVM). Set a proper limit to ensure sufficient memory for the driver.

1G

spark.driver.host

Specifies the host name or IP address for the driver to listen on, which is used for the driver to communicate with the executor.

(local hostname)

spark.driver.port

Specifies the port for the driver to listen on, which is used for the driver to communicate with the executor.

(random)

ExecutorLauncher Configuration

ExecutorLauncher exists only in Yarn-client mode. In Yarn-client mode, ExecutorLauncher and the driver are not in the same process. Therefore, you need to configure parameters for ExecutorLauncher.

Table 11 Parameter description

Parameter

Description

Default Value

spark.yarn.am.extraJavaOptions

Indicates a string of extra JVM options to pass to the YARN ApplicationMaster in client mode. Use spark.driver.extraJavaOptions in cluster mode.

For details, see Configuring Parameters Rapidly.

spark.yarn.am.memory

Indicates the amount of memory to use for the YARN ApplicationMaster in client mode, in the same format as JVM memory strings (for example, 512 MB or 2 GB). In cluster mode, use spark.driver.memory instead.

1G

spark.yarn.am.memoryOverhead

This parameter is the same as spark.yarn.driver.memoryOverhead. However, this parameter applies only to ApplicationMaster in client mode.

-

spark.yarn.am.cores

Indicates the number of cores to use for the YARN ApplicationMaster in client mode. Use spark.driver.cores in cluster mode.

1

Executor Configuration

An executor is a Java process. However, unlike the driver and ApplicationMaster, an executor can have multiple processes. Spark supports only same configurations. That is, the process parameters of all executors must be the same.

Table 12 Parameter description

Parameter

Description

Default Value

spark.executor.extraJavaOptions

Indicates extra JVM option passed to the executor, for example, GC setting and logging. Do not set Spark attributes or heap size using this option. Instead, set Spark attributes using the SparkConf object or the spark-defaults.conf file specified when the spark-submit script is called. Set heap size using spark.executor.memory.

For details, see Configuring Parameters Rapidly.

spark.executor.extraClassPath

Indicates the extra classpath attached to the executor classpath. This parameter ensures compatibility with historical versions of Spark. Generally, you do not need to set this parameter.

-

spark.executor.extraLibraryPath

Sets the special library path used when the executor JVM is started.

For details, see Configuring Parameters Rapidly.

spark.executor.userClassPathFirst

(Trial) Same function as spark.driver.userClassPathFirst. However, this parameter applies to executor instances.

false

spark.executor.memory

Indicates the memory size used by each executor process. Its character sting is in the same format as the JVM memory (example: 512 MB or 2 GB).

4G

spark.executorEnv.[EnvironmentVariableName]

Adds the environment variable specified by EnvironmentVariableName to the executor process. You can specify multiple environment variables.

-

spark.executor.logs.rolling.maxRetainedFiles

Sets the number of latest log files to be retained by the system during rolling. The old log files are deleted. This function is disabled by default.

-

spark.executor.logs.rolling.size.maxBytes

Sets the maximum size of the executor log file for rolling. This function is disabled by default. The value is in bytes. To automatically clear old logs, see spark.executor.logs.rolling.maxRetainedFiles.

-

spark.executor.logs.rolling.strategy

Sets the executor log rolling policy. Rolling is disabled by default. The value can be time (time-based rolling) or size (size-based rolling). If this parameter is set to time, the value of the spark.executor.logs.rolling.time.interval attribute is used as the log rolling interval. If this parameter is set to size, spark.executor.logs.rolling.size.maxBytes is used to set the maximum size of the file for rolling.

-

spark.executor.logs.rolling.time.interval

Sets the time interval for executor log rolling. This function is disabled by default. The value can be daily, hourly, minutely, or any number of seconds. To automatically clear old logs, see spark.executor.logs.rolling.maxRetainedFiles.

daily

WebUI

The Web UI displays the running process and status of the Spark application.

Table 13 Parameter description

Parameter

Description

Default Value

spark.ui.killEnabled

Allows stages and jobs to be stopped on the web UI.

NOTE:

For security purposes, the default value of this parameter is set to false to prevent misoperations. To enable this function, set this parameter to true in the spark-defaults.conf configuration file. Exercise caution when performing this operation.

true

spark.ui.port

Specifies the port for your application's dashboard, which displays memory and workload data.

  • JDBCServer2x: 4040
  • SparkResource2x: 0
  • IndexServer2x: 22901

spark.ui.retainedJobs

Specifies the number of jobs recorded by the Spark UI and status API before GC.

1000

spark.ui.retainedStages

Specifies the number of stages recorded by the Spark UI and status API before GC.

1000

HistoryServer

A History Server reads the EventLog file in the file system and displays the running status of the Spark application.

Table 14 Parameter description

Parameter

Description

Default Value

spark.history.fs.logDirectory

Specifies the log directory of a History Server.

-

spark.history.ui.port

Specifies the port for JobHistory listening to connection.

18080

spark.history.fs.updateInterval

Specifies the update interval of the information displayed on a History Server, in seconds. Each update checks for changes made to the event logs in the persistent store.

10s

spark.history.fs.update.interval.seconds

Specifies the interval for checking the update of each event log. This parameter has the same function as spark.history.fs.updateInterval. spark.history.fs.updateInterval is recommended.

10s

spark.history.updateInterval

This parameter has the same function as spark.history.fs.update.interval.seconds and spark.history.fs.updateInterval. spark.history.fs.updateInterval is recommended.

10s

History Server UI Timeout and Maximum Number of Access Times

Table 15 Parameter description

Parameter

Description

Default Value

spark.session.maxAge

Specifies the session timeout interval, in seconds. This parameter applies only to the security mode. This parameter cannot be set in normal mode.

600

spark.connection.maxRequest

Specifies the maximum number of concurrent client access requests to JobHistory.

5000

EventLog

During the running of Spark applications, the running status is written into the file system in JSON format in real time for the History Server service to read and reproduce the application running status.

Table 16 Parameter description

Parameter

Description

Default Value

spark.eventLog.enabled

Indicates whether to log Spark events, which are used to reconstruct the web UI after the application execution is complete.

true

spark.eventLog.dir

Indicates the directory for logging Spark events if spark.eventLog.enabled is set to true. In this directory, Spark creates a subdirectory for each application and logs events of the application in the subdirectory. You can also set a unified address similar to the HDFS directory so that the History Server can read historical files.

hdfs://hacluster/spark2xJobHistory2x

spark.eventLog.compress

Indicates whether to compress logged events when spark.eventLog.enabled is set to true.

false

Periodic Clearing of Event Logs

Event logs on JobHistory increases with submitted tasks. Too many event log files exist as the number of submitted tasks increases. Spark provides the function for periodically clearing event logs. You can enable this function and set the clearing interval using related parameters.

Table 17 Parameter description

Parameter

Description

Default Value

spark.history.fs.cleaner.enabled

Indicates whether to enable the clearing function.

true

spark.history.fs.cleaner.interval

Indicates the check interval of the clearing function.

1d

spark.history.fs.cleaner.maxAge

Indicates the maximum duration for storing logs.

4d

Kryo

Kryo is a highly efficient Java serialization framework, which is integrated into Spark by default. Almost all Spark performance tuning requires the process of converting the default serializer of Spark into a Kryo serializer. Kryo serialization supports only serialization at the Spark data layer. To configure Kryo serialization, set spark.serializer to org.apache.spark.serializer.KryoSerializer and configure the following parameters to optimize Kryo serialization performance:

Table 18 Parameter description

Parameter

Description

Default Value

spark.kryo.classesToRegister

Specifies the name of the class that needs to be registered with Kryo when Kryo serialization is used. Multiple classes are separated by commas (,).

-

spark.kryo.referenceTracking

Indicates whether to trace the references to the same object when Kryo is used to serialize data. This function is applicable to the scenario where the object graph has circular references or the same object has multiple copies. Otherwise, you can disable this function to improve performance.

true

spark.kryo.registrationRequired

Indicates whether Kryo is used to register an object. When this parameter is set to true, an exception is thrown if an object that is not registered with Kryo is serialized. When it is set to false (default value), Kryo writes unregistered class names to the serialized object. This operation causes a large amount of performance overhead. Therefore, you need to enable this option before deleting a class from the registration queue.

false

spark.kryo.registrator

If Kryo serialization is used, use Kryo to register the class with the custom class. Use this property if you need to register a class in a custom way, such as specifying a custom field serializer. Otherwise, use spark.kryo.classesToRegister, which is simpler. Set this parameter to a class that extends KryoRegistrator.

-

spark.kryoserializer.buffer.max

Specifies the maximum size of the Kryo serialization buffer, in MB. The value must be greater than the object that attempts to be serialized. If the error "buffer limit exceeded" occurs in Kryo, increase the value of this parameter. You can also set this parameter by setting spark.kryoserializer.buffer.max.

64MB

spark.kryoserializer.buffer

Specifies the initial size of the Kryo serialization buffer, in MB. Each core of each worker has a buffer. If necessary, the buffer size will be increased to the value of spark.kryoserializer.buffer.max. You can also set this parameter by setting spark.kryoserializer.buffer.

64KB

Broadcast

Broadcast is used to transmit data blocks between Spark processes. In Spark, broadcast can be used for JAR packages, files, closures, and returned results. Broadcast supports two modes: Torrent and HTTP. The Torrent mode divides data into small fragments and distributes them to clusters. Data can be obtained remotely if necessary. The HTTP mode saves files to the local disk and transfers the entire files to the remote end through HTTP if necessary. The former is more stable than the latter. Therefore, Torrent is the default broadcast mode.

Table 19 Parameter description

Parameter

Description

Default Value

spark.broadcast.factory

Indicates the broadcast mode.

org.apache.spark.broadcast.TorrentBroadcastFactory

spark.broadcast.blockSize

Indicates the block size of TorrentBroadcastFactory. If the value is too large, the concurrency during broadcast is reduced (the speed is slow). If the value is too small, BlockManager performance may be affected.

4096

spark.broadcast.compress

Indicates whether to compress broadcast variables before sending them. You are advised to compress the broadcast variables.

true

Storage

Spark features in-memory computing. Spark Storage is used to manage memory resources. Storage stores data blocks generated during RDD caching. The heap memory in the JVM acts as a whole. Therefore, Storage Memory Size is an important concept during Spark Storage management.

Table 20 Parameter description

Parameter

Description

Default Value

spark.storage.memoryMapThreshold

Specifies the block size. If the size of a block exceeds the value of this parameter, Spark performs memory mapping for the disk file. This prevents Spark from mapping too small blocks during memory mapping. Generally, memory mapping for blocks whose page size is close to or less than that of the operating system has high overhead.

2m

PORT

Table 21 Parameter description

Parameter

Description

Default Value

spark.ui.port

Specifies the port for your application's dashboard, which displays memory and workload data.

  • JDBCServer2x: 4040
  • SparkResource2x: 0

spark.blockManager.port

Specifies all ports listened by BlockManager. These ports are on both the driver and executor.

Range of Random Ports

spark.driver.port

Specifies the port for the driver to listen on, which is used for the driver to communicate with the executor.

Range of Random Ports

Range of Random Ports

All random ports must be within a certain range.

Table 22 Parameter description

Parameter

Description

Default Value

spark.random.port.min

Sets the minimum random port.

22600

spark.random.port.max

Sets the maximum random port.

22899

TIMEOUT

By default, computation tasks that can well process medium-scale data are configured in Spark. However, if the data volume is too large, the tasks may fail due to timeout. In the scenario with a large amount of data, the timeout parameter in Spark needs to be assigned a larger value.

Table 23 Parameter description

Parameter

Description

Default Value

spark.files.fetchTimeout

Specifies the communication timeout (in seconds) when fetching files added using SparkContext.addFile() of the driver.

60s

spark.network.timeout

Specifies the default timeout for all network interactions, in seconds. You can use this parameter to replace spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs, or spark.shuffle.io.connectionTimeout.

360s

spark.core.connection.ack.wait.timeout

Specifies the timeout for a connection to wait for a response, in seconds. To avoid long-time waiting caused by GC, you can set this parameter to a larger value.

60

Encryption

Spark supports SSL for Akka and HTTP (for the broadcast and file server) protocols, but does not support SSL for the web UI and block transfer service.

SSL must be configured on each node and configured for each component involved in communication using a particular protocol.

Table 24 Parameter description

Parameter

Description

Default Value

spark.ssl.enabled

Indicates whether to enable SSL connections for all supported protocols.

All SSL settings similar to spark.ssl.xxx indicate the global configuration of all supported protocols. To override the global configuration of a particular protocol, you must override the property in the namespace specified by the protocol.

Use spark.ssl.YYY.XXX to overwrite the global configuration of the particular protocol specified by YYY. YYY can be either akka for Akka-based connections or fs for the broadcast and file server.

false

spark.ssl.enabledAlgorithms

Indicates the comma-separated list of passwords. The specified passwords must be supported by the JVM.

-

spark.ssl.keyPassword

Specifies the password of a private key in the keystore.

-

spark.ssl.keyStore

Specifies the path of the keystore file. The path can be absolute or relative to the directory where the component is started.

-

spark.ssl.keyStorePassword

Specifies the password of the keystore.

-

spark.ssl.protocol

Specifies the protocol name. This protocol must be supported by the JVM. The reference list of protocols is available on this page.

-

spark.ssl.trustStore

Specifies the path of the truststore file. The path can be absolute or relative to the directory where the component is started.

-

spark.ssl.trustStorePassword

Specifies the password of the truststore.

-

Security

Spark supports shared key-based authentication. You can use spark.authenticate to configure authentication. This parameter controls whether the Spark communication protocol uses the shared key for authentication. This authentication is a basic handshake that ensures that both sides have the same shared key and are allowed to communicate. If the shared keys are different, the communication is not allowed. You can create shared keys as follows:

  • For Spark on Yarn deployments, set spark.authenticate to true. Then, shared keys are automatically generated and distributed. Each application exclusively occupies a shared key.
  • For other types of Spark deployments, configure Spark parameter spark.authenticate.secret on each node. All masters, workers, and applications use this key.
Table 25 Parameter description

Parameter

Description

Default Value

spark.acls.enable

Indicates whether to enable Spark ACLs. If Spark ACLs are enabled, the system checks whether the user has the permission to access and modify jobs. Note that this requires the user to be identifiable. If the user is identified as invalid, the check will not be performed. Filters can be used to verify and set users on the UI.

true

spark.admin.acls

Specifies the comma-separated list of users/administrators that have the permissions to view and modify all Spark jobs. This list can be used if you are running on a shared cluster and working with the help of an MRS cluster administrator or developer.

admin

spark.authenticate

Indicates whether Spark authenticates its internal connections. If the application is not running on Yarn, see spark.authenticate.secret.

true

spark.authenticate.secret

Sets the key for authentication between Spark components. This parameter must be set if Spark does not run on Yarn and authentication is disabled.

-

spark.modify.acls

Specifies the comma-separated list of users who have the permission to modify Spark jobs. By default, only users who have enabled Spark jobs have the permission to modify the list (for example, delete the list).

-

spark.ui.view.acls

Specifies the comma-separated list of users who have the permission to access the Spark web UI. By default, only users who have enabled Spark jobs have the access permission.

-

Enabling the Authentication Mechanism Between Spark Processes

Spark processes support shared key-based authentication. You can configure spark.authenticate to control whether Spark performs authentication during communication. In this authentication mode, the two communication parties share the same key only using simple handshakes.

Configure the following parameters in the spark-defaults.conf file on the Spark client.

Table 26 Parameter description

Parameter

Description

Default Value

spark.authenticate

For Spark on Yarn deployments, set this parameter to true. Then, keys are automatically generated and distributed, and each application uses a unique key.

true

Compression

Data compression is policy that optimizes memory usage at the expense of CPU. Therefore, when the Spark memory is severely insufficient (this issue is common due to the characteristics of in-memory computing), data compression can greatly improve performance. Spark supports three types of compression algorithm: Snappy, LZ4, and LZF. Snappy is the default compression algorithm and invokes the native method to compress and decompress data. In Yarn mode, pay attention to the impact of non-heap memory on the container process.

Table 27 Parameter description

Parameter

Description

Default Value

spark.io.compression.codec

Indicates the codec for compressing internal data, such as RDD partitions, broadcast variables, and shuffle output. By default, Spark supports three types of compression algorithm: LZ4, LZF, and Snappy. You can specify algorithms using fully qualified class names, such as org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, and org.apache.spark.io.SnappyCompressionCodec.

lz4

spark.io.compression.lz4.block.size

Indicates the block size (bytes) used in LZ4 compression when the LZ4 compression algorithm is used. When LZ4 is used, reducing the block size also reduces the shuffle memory usage.

32768

spark.io.compression.snappy.block.size

Indicates the block size (bytes) used in Snappy compression when the Snappy compression algorithm is used. When Snappy is used, reducing the block size also reduces the shuffle memory usage.

32768

spark.shuffle.compress

Indicates whether to compress the output files of a Map task. You are advised to compress the broadcast variables. using spark.io.compression.codec.

true

spark.shuffle.spill.compress

Indicates whether to compress the data overflowed during shuffle using spark.io.compression.codec.

true

spark.eventLog.compress

Indicates whether to compress logged events when spark.eventLog.enabled is set to true.

false

spark.broadcast.compress

Indicates whether to compress broadcast variables before sending them. You are advised to compress the broadcast variables.

true

spark.rdd.compress

Indicates whether to compress serialized RDD partitions (for example, the StorageLevel.MEMORY_ONLY_SER partition). Substantial space can be saved at the cost of some extra CPU time.

false

Reducing the Probability of Abnormal Client Application Operations When Resources Are Insufficient

When resources are insufficient, ApplicationMaster tasks must wait and will not be processed until enough resources are available for use. If the actual waiting time exceeds the configured waiting time, the ApplicationMaster tasks will be deleted. Adjust the following parameters to reduce the probability of abnormal client application operation.

Configure the following parameters in the spark-defaults.conf file on the client.

Table 28 Parameter description

Parameter

Description

Default Value

spark.yarn.applicationMaster.waitTries

Specifies the number of the times that ApplicationMaster waits for Spark master, which is also the times that ApplicationMaster waits for SparkContext initialization. Enlarge this parameter value to prevent ApplicationMaster tasks from being deleted and reduce the probability of abnormal client application operations.

10

spark.yarn.am.memory

Specifies the ApplicationMaster memory. Enlarge this parameter value to prevent ApplicationMaster tasks from being deleted by ResourceManager due to insufficient memory and reduce the probability of abnormal client application operations.

1G