Updated on 2024-08-10 GMT+08:00

Spark Application Development Overview

Spark

Spark is a distributed batch processing framework. It provides analysis and mining and iterative memory computing capabilities and supports application development in multiple programming languages, including Scala, Java, and Python. Spark is used for the following scenarios:

  • Data processing: Spark can process data quickly and has fault tolerance and scalability.
  • Iterative computation: Spark supports iterative computation to keep up with the multi-step data processing logic.
  • Data mining: Spark can handle complex data mining and analysis tasks using massive amounts of data, and it supports various machine learning and data mining algorithms.
  • Streaming processing: Spark can process streaming data with low latency at the seconds-level and can integrate with various external data sources.
  • Query analysis: Sparks supports standard SQL query analysis, provides the DSL (DataFrame), and supports multiple external inputs.

This section focuses on the application development guides of Spark, Spark SQL, and Spark Streaming.

Spark APIs

Spark supports application development in several programming languages, such as Scala, Java, and Python. However, since Spark is built in Scala, it is recommended to develop Spark applications using Scala, which is known for its readability.

Divided by different languages, Spark APIs are listed in Table 1.

Table 1 Spark APIs

API

Description

Scala API

Indicates the API in Scala. For common APIs of Spark Core, Spark SQL, and Spark Streaming, see Spark Scala APIs.

Java API

Indicates the API in Java. For common APIs of Spark Core, Spark SQL, and Spark Streaming, see Spark Java APIs.

Python API

Indicates the API in Python. For common APIs of Spark Core, Spark SQL, and Spark Streaming, see Spark Python APIs.

Divided by different modes, APIs listed in the table are used in the development of Spark Core and Spark Streaming. Spark SQL can be accessed through CLI and JDBCServer. There are two ways to access the JDBCServer: Beeline and the JDBC client code. For details, see Spark JDBCServer APIs.

When running applications that involve SQL operations using spark-sql, spark-shell, and spark-submit scripts, avoid using the proxy user parameter to submit tasks.

Concepts

  • RDD

    Resilient Distributed Dataset (RDD) is a fundamental concept in Spark that refers to a distributed dataset that is read-only and partitioned. Some or all of the data in this dataset can be cached in memory and reused across computations.

    RDD creation

    • An RDD can be created from the input of HDFS or other storage systems that are compatible with Hadoop.
    • An RDD can be converted from a parent RDD.
    • An RDD can be converted from a collection of datasets through encoding.

    RDD storage

    • You can select different storage levels to store an RDD for reuse. (There are 11 storage levels to store an RDD.)
    • The current RDD is stored in the memory by default. If there is not enough memory, the RDD will overflow to the disk.
  • RDD dependency

    The RDD dependency includes the narrow dependency and wide dependency.

    Figure 1 RDD dependency
    • Narrow dependency: Each partition of the parent RDD is used by at most one partition of the child RDD partition.
    • Wide dependency: Partitions of the child RDD depend on all partitions of the parent RDD due to shuffle operations.

    The narrow dependency facilitates the optimization. Each RDD operator can be thought of as a fork/join operation, where the join refers to a synchronization barrier used to coordinate multiple concurrent tasks. The RDD is forked to each partition, and after the computation, the results are joined. This process is repeated for the next RDD operator. Directly translating RDD into physical implementation is not cost-effective for two reasons. Firstly, each RDD (even intermediate results) needs to be stored in memory or storage, which takes up a lot of time and space. Secondly, the join operation, acting as a global barrier, is very expensive and can be slowed down by the slowest node, which can slow down the entire join process. If the partitions of the child RDD narrowly depend on that of the parent RDD, classic fusion optimization can be implemented by combining the two fork/join processes. If the relationship in the continuous operator sequence is narrow dependency, multiple fork/join processes can be combined to reduce the number of global barriers and eliminate the need to store many intermediate results of RDD, resulting in a significant improvement in performance. This is called pipeline optimization in Spark.

  • Transformation and action (RDD operations)

    Operations on RDD include transformation (the returned value is an RDD) and action (the returned value is not an RDD). Figure 2 shows the RDD operation process. Transformations are lazy, which indicates that transformation operations from one RDD to another RDD are not immediately executed. Spark only records transformations, which are only computed when actions are triggered. The action returns results or writes the RDD data into the storage system. The action is the driving force for Spark to start the computation.

    Figure 2 RDD operation

    The data and operation model of RDD are quite different from those of Scala.

    val file = sc.textFile("hdfs://...")
    val errors = file.filter(_.contains("ERROR"))
    errors.cache()
    errors.count()
    1. The textFile operator reads log files from the HDFS and returns file (as an RDD).
    2. The filter operator filters rows with ERROR and assigns them to errors (a new RDD). The filter operator is a transformation.
    3. The cache operator caches errors for future use.
    4. The count operator returns the number of rows of errors. The count operator is an action.
    Transformation includes the following types:
    • The RDD elements are regarded as simple elements:

      The input and output has the one-to-one relationship, and the partition structure of the result RDD remains unchanged, for example, map.

      The input and output has the one-to-many relationship, and the partition structure of the result RDD remains unchanged, for example, flatMap (one element becomes a sequence containing multiple elements after the map operation and then flattens to multiple elements).

      The input and output has the one-to-one relationship, but the partition structure of the result RDD changes, for example, union (two RDDs integrates to one RDD, and the number of partitions becomes the sum of the number of partitions of two RDDs) and coalesce (the number of partitions is reduced).

      Operators of some elements are selected from the input, such as filter, distinct (duplicate elements are deleted), subtract (elements only exist in this RDD are retained), and sample (samples are taken).

    • The RDD elements are regarded as key-value pairs:

      Perform the one-to-one calculation on the single RDD, such as mapValues (the partition mode of the source RDD is retained, which is different from map).

      Sort the single RDD, such as sort and partitionBy (partitioning with consistency, which is important to the local optimization).

      Restructure and reduce the single RDD based on key, such as groupByKey and reduceByKey.

      Join and restructure two RDDs based on the key, such as join and cogroup.

      The later three operations involving sorting are called shuffle operations.

    Action includes the following types:

    • Generate scalar configuration items, such as count (the number of elements in the returned RDD), reduce, fold/aggregate (the number of scalar configuration items that are returned), and take (the number of elements before the return).
    • Generate the Scala collection, such as collect (import all elements in the RDD to the Scala collection) and lookup (look up all values corresponds to the key).
    • Write data to the storage, such as saveAsTextFile (which corresponds to the preceding textFile).
    • Check points, such as the checkpoint operator. When working with graphics computation, Lineage can become lengthy, causing delays in the event of a fault. To mitigate this issue, a checkpoint is utilized to save the current data to stable storage.
  • Shuffle

    Shuffle is a specific phase in the MapReduce framework. It is located between the Map phase and the Reduce phase. If the output results of Map are to be used by Reduce, the output results must be hashed based on the key and distributed to the corresponding Reducer. This process is called Shuffle. Shuffle involves the read and write of the disk and the transmission of the network, so that the performance of Shuffle directly affects the operation efficiency of the entire program.

    The following figure describes the entire process of the MapReduce algorithm.

    Figure 3 Algorithm process

    Shuffle is a bridge connecting data. The following describes the implementation of shuffle in Spark.

    Shuffle divides a Spark job into multiple stages. The former stages contain one or multiple ShuffleMapTasks, and the last stage contains one or multiple ResultTasks.

  • Spark application structure

    The Spark application structure includes the initial SparkContext and the main program.

    • Initial SparkContext: constructs the running environment of the Spark application.

      Construct the SparkContext object.

      new SparkContext(master, appName, [SparkHome], [jars])

      Parameter description

      master: indicates the link string. The link modes include local, Yarn-cluster, and Yarn-client.

      appName: indicates the application name.

      SparkHome: indicates the directory where Spark is installed in the cluster.

      jars: indicates the code and dependency package of the application.

    • Main program: processes data.

    For submitting applications details, see https://spark.apache.org/docs/3.1.1/submitting-applications.html

  • Spark Shell commands

    The basic Spark shell command supports the submitting of the Spark application. The Spark shell command is as follows.

    ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      ... # other options
      <application-jar> \
      [application-arguments]

    Parameter description

    --class: indicates the name of the class of a Spark application.

    --master: indicates the master to which the Spark application links, such as Yarn-client and Yarn-cluster.

    application-jar: indicates the path of the JAR package of the Spark application.

    application-arguments: indicates the parameter required to submit the Spark application. This parameter can be left blank.

  • Spark JobHistory Server

    The Spark web UI is used to monitor the details in each phase of the Spark framework of a running or historical Spark job and provide the log display, which helps users to develop, configure, and optimize the job in more fine-grained units.

Spark SQL Basic Concepts

DataSet

A dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

DataFrame is a structured distributed dataset composed of several columns. DataFrame is similar to a table in the relationship database or the data frame in R/Python. DataFrame is a basic concept in Spark SQL. It can be created by using multiple methods, such as structured dataset, Hive table, external database, or RDD.

Spark Streaming Basic Concepts

Dstream

The DStream (Discretized Stream) is an abstract concept provided by the Spark Streaming.

The DStream is a continuous data stream which is obtained from the data source or transformed and generated by the inflow. In essence, a DStream is a series of continuous RDDs. The RDD is a distributed dataset which can be read only and divided into partitions.

Each RDD in the DStream contains data in a range, as shown in Figure 4.

Figure 4 Relationship between DStream and RDD

All operators applied in the DStream are translated to the operations in the lower RDD, as shown in Figure 5. The transformation of the lower RDDs is calculated by using the Spark engine. Most operation details are concealed in the DStream operators and High-level APIs are provided for developers.

Figure 5 DStream operator transfer

Structured Streaming Basic Concepts

  • Input Source

    Input data sources. Input data sources are required to support offset-based data replay, with varying levels of fault tolerance across different sources.

  • Sink

    Data output. Sinks are mandated to support idempotent write operations to ensure fault tolerance, with each sink possessing distinct fault tolerance properties.

  • outputMode

    There are three supported output modes for result output:

    • Complete Mode: The updated result sets are written into an external storage system, with the storage connector determining how to handle writing the entire table.
    • Append Mode: Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This mode is applicable only to a result set that has already existed and will not be updated.
    • Update Mode: If an interval is triggered, only updated data in the result table will be written into an external system, which is the difference between the Complete Mode and Update Mode.
  • Trigger

    There are four supported output triggers:

    • Default: Micro-batch mode. Subsequent batches are automatically initiated upon completion of the current batch.
    • Specific interval: Processing is configured to occur at predetermined intervals.
    • One-time execution: Query is executed just once.
    • Continuous mode: The experimental continuous mode significantly reduces stream processing delays to as low as 1 millisecond (100 milliseconds is recommended).

Structured Streaming accommodates both micro-batch and continuous processing modes. While micro-batch mode prioritizes throughput over latency, continuous mode caters to scenarios demanding millisecond-level latency and is currently an experimental feature.

In the current version, if the stream-to-batch joins function is required, outputMode must be set to Append Mode.

Figure 6 Running process in micro-batch mode
Figure 7 Running process in continuous mode