Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Actualización más reciente 2023-04-14 GMT+08:00

Principios básicos de Spark

El componente de Spark se aplica a versiones anteriores a MRS 3.x.

Descripción

Spark es un marco de procesamiento de datos paralelo de código abierto. Le ayuda a desarrollar fácilmente aplicaciones de big data unificadas y a realizar procesamiento fuera de línea, procesamiento de flujo y análisis interactivo de datos.

Spark proporciona un marco de trabajo que incluye computación rápida, escritura y consultas interactivas. Spark tiene ventajas obvias sobre Hadoop en términos de rendimiento. Spark utiliza el modo de computación en memoria para evitar cuellos de botella de E/S en escenarios donde varias tareas en un flujo de trabajo de MapReduce procesan el mismo conjunto de datos. Spark se implementa mediante el uso del lenguaje de programación Scala. Scala permite procesar conjuntos de datos distribuidos en un método que es el mismo que el de procesar datos locales. Además del análisis de datos interactivo, Spark admite la minería de datos interactiva. Spark adopta la informática en memoria, lo que facilita la informática iterativa. Por coincidencia, la computación iterativa de los mismos datos es un problema general que enfrenta la minería de datos. Además, Spark puede ejecutarse en clústeres de Yarn donde está instalado Hadoop 2.0. La razón por la que Spark no solo puede conservar varias características como MapReduce tolerancia a fallos, localización de datos y escalabilidad, sino también garantizar un alto rendimiento y evitar E/S de disco ocupado es que se crea una estructura de abstracción de memoria llamada Conjunto de datos distribuidos resilientes (RDD) para Spark.

La abstracción de memoria distribuida original, por ejemplo, el almacén de valores clave y las bases de datos, admite una actualización de pequeña granularidad del estado de las variables. Esto requiere una copia de respaldo de datos o actualizaciones de registro para garantizar la tolerancia a fallas. En consecuencia, se produce una gran cantidad de consumo de E/S en flujos de trabajo intensivos en datos. Para el RDD, solo tiene un conjunto de API restringidas y solo admite actualizaciones de gran granularidad, por ejemplo, map y join. De esta manera, Spark solo necesita registrar los registros de operaciones de transformación generados durante el establecimiento de datos para garantizar la tolerancia a fallas sin registrar un conjunto de datos completo. Este registro de enlace de transformación de datos es una fuente para rastrear un conjunto de datos. En general, las aplicaciones paralelas aplican el mismo proceso informático para un conjunto de datos grande. Por lo tanto, el límite a la mencionada actualización de granularidad no es grande. Como se describe en las tesis de Spark, el RDD puede funcionar como múltiples marcos de computación diferentes, por ejemplo, modelos de programación de MapReduce y Pregel. Además, Spark le permite hacer que un proceso de transformación de datos sea explícitamente persistente en los discos duros. La localización de datos se implementa al permitirle controlar particiones de datos en función del valor clave de cada registro. (Una ventaja obvia de este método es que dos copias de datos a asociar serán hashed en el mismo modo.) Si el uso de memoria excede el límite físico, Spark escribe particiones relativamente grandes en los discos duros, lo que garantiza la escalabilidad.

Spark tiene las siguientes características:

  • Rápido: La velocidad de procesamiento de datos de Spark es de 10 a 100 veces mayor que la de MapReduce.
  • Fácil de usar: Java, Scala y Python se pueden usar para compilar aplicaciones paralelas de manera sencilla y rápida para procesar cantidades masivas de datos. Spark ofrece más de 80 operadores para ayudarle a compilar aplicaciones paralelas.
  • Universal: Spark proporciona muchas herramientas, por ejemplo, Spark SQL y Spark Streaming. Estas herramientas se pueden combinar de forma flexible en una aplicación.
  • Integración con Hadoop: Spark puede ejecutarse directamente en un clúster de Hadoop y leer datos de Hadoop existentes.

El componente Spark de MRS tiene las siguientes ventajas:

  • El componente de Spark Streaming de MRS admite el procesamiento de datos en tiempo real en lugar de activarse según lo programado.
  • El componente de Spark de MRS proporciona Streaming Estructurado y le permite crear aplicaciones de streaming mediante la API de Dataset. Spark admite la semántica de exactly-once y joins internas y externas para los flujos.
  • El componente Spark de MRS utiliza pandas_udf para reemplazar las funciones definidas por el usuario (UDF) originales de PySpark para procesar datos, lo que reduce la duración de procesamiento de un 60% a un 90% (afectada por operaciones específicas).
  • El componente de Spark de MRS también admite el procesamiento de datos de gráficos y permite el modelado utilizando gráficos durante el cálculo de gráficos.
  • Spark SQL de MRS es compatible con alguna sintaxis de Hive (basado en las 64 sentencias SQL del conjunto de pruebas Hive-Test-benchmark) y sintaxis SQL estándar (basado en las 99 sentencias SQL del conjunto de pruebas TPC-DS).

Para obtener más información sobre la arquitectura y los principios de Spark, visite https://spark.apache.org/docs/3.1.1/quick-start.html.

Arquitectura

Figura 1 describe la arquitectura de Spark y Tabla 1 enumera los módulos de Spark.

Figura 1 Arquitectura de Spark
Tabla 1 Conceptos básicos

Módulo

Descripción

Cluster Manager

El administrador de clústeres gestiona los recursos del clúster. Spark admite varios administradores de clústeres, incluidos Mesos, Yarn y el administrador de clústeres independiente que se entrega con Spark.

Application

Aplicación de Spark. Consiste en un programa de Driver Program y múltiples executors.

Deploy Mode

Implementación en modo de clúster o cliente. En modo de clúster, el controlador se ejecuta en un nodo dentro del clúster. En modo cliente, el controlador se ejecuta en el cliente (fuera del clúster).

Driver Program

El proceso principal de la aplicación Spark. Ejecuta la función main() de una aplicación y crea "SparkContext". Se utiliza para analizar aplicaciones, generar etapas y programar tareas a ejecutores. Por lo general, SparkContext representa Driver Program.

Executor

Un proceso iniciado en un Work Node. Se utiliza para ejecutar tareas, y gestionar y procesar los datos utilizados en las aplicaciones. Una aplicación de Spark generalmente contiene varios executors. Cada executor recibe comandos del driver y ejecuta una o varias tareas.

Worker Node

Nodo que inicia y gestiona ejecutores y recursos en un clúster.

Job

Un job consta de varias tareas simultáneas. Un operador de action (por ejemplo, un operador de collect) se asigna a un job.

Stage

Cada job consta de múltiples etapas. Cada etapa es un conjunto de tareas, que está separado por el Gráfico Acíclico Dirigido (DAG).

Task

Una tarea lleva la unidad de cálculo de la lógica de servicio. Es la unidad de trabajo mínima que se puede ejecutar en la plataforma de Spark. Una aplicación se puede dividir en múltiples tareas basadas en el plan de ejecución y la cantidad de cálculo.

Principio de ejecución de la aplicación de Spark

Figura 2 muestra la arquitectura en ejecución de la aplicación de Spark. El proceso en ejecución es el siguiente:

  1. Una aplicación se está ejecutando en el clúster como una colección de procesos. El controlador coordina la ejecución de la aplicación.
  2. Para ejecutar una aplicación, el controlador se conecta al administrador de clústeres (como Standalone, Mesos y Yarn) para solicitar los recursos del ejecutor e iniciar ExecutorBackend. El administrador de clúster programa los recursos entre diferentes aplicaciones. El controlador programa los DAGs, divide las etapas y genera tareas para la aplicación al mismo tiempo.
  3. A continuación, Spark envía los códigos de la aplicación (los códigos transferidos a SparkContext definidos por JAR o Python) a executor.
  4. Una vez terminadas todas las tareas, se detiene la ejecución de la aplicación de usuario.
Figura 2 Arquitectura de ejecución de aplicaciones de Spark

Figura 3 muestra los modos de Master y Worker adoptados por Spark. Un usuario envía una aplicación en el cliente de Spark y, a continuación, el programador divide un job en varias tareas y envía las tareas a cada Worker para su ejecución. Cada Worker informa los resultados del cálculo al Driver (Master) y, a continuación, el Driver agrega y devuelve los resultados al cliente.

Figura 3 Modo de Spark Master-Worker

Tenga en cuenta lo siguiente acerca de la arquitectura:

  • Las aplicaciones se aíslan entre sí.

    Cada aplicación tiene un proceso de executor independiente, y cada executor inicia varios subprocesos para ejecutar tareas en paralelo. Ya sea en términos de programación o la ejecución de tarea en executors. Cada driver programa de forma independiente sus propias tareas. Diferentes tareas de aplicación se ejecutan en diferentes JVMs, es decir, diferentes ejecutors.

  • Las diferentes aplicaciones de Spark no comparten datos, a menos que los datos se almacenen en el sistema de almacenamiento externo, como HDFS.
  • Se recomienda desplegar el programa de Driver en una ubicación cercana al nodo de Worker porque el programa de Driver programa tareas en el clúster. Por ejemplo, desplegar el programa de Driver en la red donde se encuentra el nodo de Worker.

Spark en YARN se puede desplegar en dos modos:

  • En el modo de Yarn-cluster, el driver de Spark se ejecuta dentro de un proceso de ApplicationMaster que es getionado por Yarn en el clúster. Después de iniciar el ApplicationMaster, el cliente puede salir sin interrumpir la ejecución del servicio.
  • En el modo Yarn-client, el driver se inicia en el proceso de cliente y el proceso de ApplicationMaster solo se utiliza para solicitar recursos del clúster de Yarn.

Principio de Spark Streaming

Spark Streaming es un marco informático en tiempo real basado en Spark, que amplía la capacidad para procesar datos de streaming masivos. Actualmente, Spark admite los siguientes métodos de procesamiento de datos:

  • Direct Streaming

    En el enfoque de Direct Streaming, Direct API se utiliza para procesar datos. Tomemos Kafka Direct API como ejemplo. Direct API proporciona una ubicación de desplazamiento desde la que cada rango de batch leerá, lo que es mucho más simple que iniciar un receptor para recibir continuamente datos de Kafka y datos escritos en registros de escritura previa (WAL). A continuación, cada job de batch se está ejecutando y los datos de desplazamiento correspondientes están listos en Kafka. Esta información de desplazamiento puede almacenarse de forma segura en el archivo de punto de control y leerse por aplicaciones que no se iniciaron.

    Figura 4 Transmisión de datos a través de Direct Kafka API

    Después de la falla, Spark Streaming puede leer datos de Kafka de nuevo y procesar el segmento de datos. El resultado del procesamiento es el mismo sin importar que Spark Streaming falle o no, porque la semántica se procesa solo una vez.

    Direct API no necesita usar el WAL y Receivers, y se asegura de que cada registro de Kafka se reciba solo una vez, lo que es más eficiente. De esta manera, Spark Streaming y Kafka se pueden integrar bien, haciendo que los canales de streaming se presenten con alta tolerancia a fallas, alta eficiencia y facilidad de uso. Por lo tanto, se recomienda utilizar Direct Streaming para procesar datos.

  • Receiver

    Cuando se inicia una aplicación de Spark Streaming (es decir, cuando se inicia el driver), el StreamingContext relacionado (la base de todas las funciones de streaming) utiliza SparkContext para iniciar el receiver y convertirse en una tarea en ejecución a largo plazo. Estos receivers reciben y guardan datos de transmisión en la memoria de Spark para su procesamiento. Figura 5 muestra el ciclo de vida de la transferencia de datos.

    Figura 5 Ciclo de vida de la transferencia de datos
    1. Recibir datos (flecha azul).

      Receiver divide un flujo de datos en una serie de bloques y los almacena en la memoria del executor. Además, después de habilitar WAL, escribe datos en el WAL del sistema de archivos tolerante a fallas.

    2. Notificar al driver (flecha verde).

      Los metadatos en el bloque recibido se envían a StreamingContext en el driver. Los metadatos incluyen:

      • ID de referencia de bloque utilizado para localizar la posición de los datos en la memoria del Executor.
      • Información de desplazamiento de datos de bloque en registros (si la función WAL está habilitada).
    3. Procesar datos (flecha roja).

      Para cada lote de datos, StreamingContext utiliza información de bloque para generar conjuntos de datos distribuidos resilientes (RDD) y jobs. StreamingContext ejecuta jobs ejecutando tareas para procesar bloques en la memoria del executor.

    4. Establecer periódicamente puntos de control (flechas naranjas).

      Para la tolerancia a fallas, StreamingContext establece periódicamente puntos de comprobación y los guarda en sistemas de archivos externos.

Tolerancia a fallas

Spark y su RDD permiten procesar sin problemas las fallas de cualquier nodo de Worker en el clúster. Spark Streaming está construido sobre Spark. Por lo tanto, el nodo Worker de Spark Streaming también tiene la misma capacidad de tolerancia a fallas. Sin embargo, Spark Streaming necesita funcionar correctamente en caso de que se ejecute durante mucho tiempo. Por lo tanto, Spark debe ser capaz de recuperarse de fallas a través del proceso del driver (proceso principal que coordina a todos los Workers). Esto plantea desafíos a la tolerancia a fallas del Spark driver debido a que el Spark driver puede ser cualquier aplicación de usuario implementada en cualquier modo de computación. Sin embargo, Spark Streaming tiene una arquitectura de computación interna. Es decir, ejecuta periódicamente el mismo computación de Spark en cada lote de datos. Dicha arquitectura le permite almacenar periódicamente puntos de control en un espacio de almacenamiento confiable y recuperarlos al reiniciar el Driver.

Para los datos de origen tales como archivos, el mecanismo de recuperación del Driver puede garantizar cero pérdida de datos porque todos los datos se almacenan en un sistema de archivos tolerante a fallas como HDFS. Sin embargo, para otras fuentes de datos tales como Kafka y Flume, algunos datos recibidos se almacenan en caché solo en la memoria y pueden perderse antes de ser procesados. Esto es causado por el modo de operación de distribución de las aplicaciones de Spark. Cuando el proceso del driver falla, todos los executors que se ejecutan en el Cluster Manager, junto con todos los datos de la memoria, se terminan. Para evitar dicha pérdida de datos, la función WAL se agrega a Spark Streaming.

WAL se utiliza a menudo en bases de datos y sistemas de archivos para garantizar la persistencia de cualquier operación de datos. Es decir, primero registrar una operación en un registro persistente y realizar esta operación en los datos. Si la operación falla, el sistema se recupera leyendo el registro y volviendo a aplicar la operación preestablecida. A continuación se describe cómo utilizar WAL para garantizar la persistencia de los datos recibidos:

Receiver se utiliza para recibir datos de fuentes de datos tales como Kafka. Como una tarea que se ejecuta durante mucho tiempo en Executor, Receiver recibe datos y también confirma los datos recibidos si son compatibles con los orígenes de datos. Los datos recibidos se almacenan en la memoria de Executor, y Driver entrega una tarea al Executor para su procesamiento.

Después de habilitar WAL, todos los datos recibidos se almacenan en archivos de registro en el sistema de archivos tolerante a fallas. Por lo tanto, los datos recibidos no pierden incluso si Spark Streaming falla. Además, el receiver comprueba la exactitud de los datos recibidos solo después de que los datos se hayan escrito previamente en registros. Los datos almacenados en caché pero no almacenados pueden ser enviados de nuevo por los orígenes de datos después de que el driver se reinicie. Estos dos mecanismos aseguran cero pérdida de datos. Es decir, todos los datos se recuperan de los registros o se reenvían por las fuentes de datos.

Para habilitar la función WAL, realice las siguientes operaciones:

  • Establezca streamingContext.checkpoint para configurar el directorio de checkpoint, que es una ruta de archivo HDFS utilizada para almacenar checkpoints de streaming y WAL.
  • Establezca spark.streaming.receiver.writeAheadLog.enable de SparkConf a true (el valor predeterminado es de false).

Después de que WAL está habilitado, todos los receivers tienen la ventaja de recuperarse de datos recibidos confiables. Se recomienda desactivar el mecanismo de réplica múltiple porque el sistema de archivos tolerante a errores de WAL también puede replicar los datos.

El rendimiento de recepción de datos se reduce después de que se habilita WAL. Todos los datos se escriben en el sistema de archivos tolerante a fallas. Como resultado, el rendimiento de escritura del sistema de archivos y el ancho de banda de red para la replicación de datos puede convertirse en el cuello de botella potencial. Para resolver este problema, se recomienda crear más receptores para aumentar el grado de paralelismo de recepción de datos o utilizar mejor hardware para mejorar el rendimiento del sistema de archivos tolerante a fallas.

Proceso de recuperación

Cuando se reinicia un driver fallido, reinícielo de la siguiente manera:

Figura 6 Proceso de recuperación de computación
  1. Recuperar la computación. (flecha naranja)

    Utilice la información del punto de control para reiniciar Driver, reconstruir el SparkContext y reinicia Receiver.

  2. Recuperar el bloque de metadatos. (flecha verde)

    Esta operación asegura que todos los bloques de metadatos necesarios se recuperan para continuar con la recuperación informática posterior.

  3. Relanzar trabajos inacabados. (flecha roja)

    Los metadatos recuperados se utilizan para generar RDD y trabajos correspondientes para el procesamiento por lotes interrumpido debido a fallas.

  4. Leer los datos del bloque guardados en los registros. (flecha azul)

    Los datos de bloque se leen directamente de los WAL durante la ejecución de los trabajos anteriores y, por lo tanto, se recuperan todos los datos esenciales almacenados de forma confiable en los registros.

  5. Reenviar datos no confirmados. (flecha púrpura)

    Los datos que se almacenan en caché pero no se almacenan en los registros en caso de fallas son reenviados por las fuentes de datos, porque el receiver no confirma los datos.

Por lo tanto, mediante el uso de WALs y Receiver confiable, Spark Streaming puede evitar la pérdida de datos de entrada causada por fallas de Driver.

Principio de SparkSQL y DataSet

SparkSQL

Figura 7 SparkSQL y DataSet

Spark SQL es un módulo para procesar datos estructurados. En la aplicación de Spark, las sentencias de SQL o las API de DataSet se pueden usar sin problemas para consultar datos estructurados.

Spark SQL y DataSet también proporcionan un método universal para acceder a múltiples fuentes de datos como Hive, CSV, Parquet, ORC, JSON y JDBC. Estas fuentes de datos también permiten la interacción de datos. Spark SQL reutiliza la lógica de procesamiento de frontend Hive y el módulo de procesamiento de metadatos. Con Spark SQL, puede consultar directamente los datos de Hive existentes.

Además, Spark SQL también proporciona API, CLI y API JDBC, lo que permite diversos accesos al cliente.

Spark SQL Native DDL/DML

En Spark 1.5, muchos comandos de Lenguaje de definición de datos (DDL)/Lenguaje de manipulación de datos (DML) se presionan hacia abajo y se ejecutan en Hive, causando acoplamiento con Hive e inflexibilidad, como informes de errores inesperados y resultados.

Spark 3.1.1 realiza la localización de comandos y reemplaza Hive con Spark SQL Native DDL/DML para ejecutar comandos de DDL/DML. Además, se realiza el desacoplamiento de Hive y se pueden personalizar los comandos.

DataSet

Un DataSet es una colección fuertemente tipada de objetos específicos de dominio que se pueden transformar en paralelo usando operaciones funcionales o relacionales. Cada Dataset también tiene una vista no escrita llamada DataFrame, que es un Dataset de Row.

El DataFrame es un conjunto de datos estructurado y distribuido que consta de varias columnas. El DataFrame es igual a una tabla en la base de datos de relaciones o al DataFrame en R/Python. El DataFrame es el concepto más básico en Spark SQL, que se puede crear mediante múltiples métodos, como el conjunto de datos estructurado, la tabla Hive, la base de datos externa o RDD.

Las operaciones disponibles en el DataSets se dividen en transformaciones y acciones.

  • Una operación de transformation puede generar un nuevo DataSet,

    por ejemplo map, filter, select y aggregate (groupBy).

  • Una operación de action puede desencadenar el cálculo y devolver resultados,

    por ejemplo count, show o escribir datos en el sistema de archivos.

Puede utilizar cualquiera de los siguientes métodos para crear un DataSet:

  • La forma más común es apuntar Spark a algunos archivos en sistemas de almacenamiento, usando la función read disponible en un SparkSession.
    val people = spark.read.parquet("...").as[Person]  // Scala
    DataSet<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class));//Java
  • También puede crear un DataSet mediante la operación de transformation disponible en una existente.
    Por ejemplo, aplique la operación de map en un DataSet existente para crear un DataSet:
    val names = people.map(_.name) // In Scala: names is Dataset.
    Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

CLI y JDBCServer

Además de las API de programación, Spark SQL también proporciona las API de CLI/JDBC.

  • Los scripts spark-shell y spark-sql pueden proporcionar la CLI para la depuración.
  • JDBCServer proporciona API de JDBC. Los sistemas externos pueden enviar directamente solicitudes JDBC para calcular y analizar datos estructurados.

Principio de SparkSession

SparkSession es una API unificada para la programación de Spark y puede considerarse como una entrada unificada para la lectura de datos. SparkSession proporciona un único punto de entrada para realizar muchas operaciones que anteriormente estaban dispersas entre varias clases, y también proporciona métodos de acceso a estas clases más antiguas para maximizar la compatibilidad.

Un SparkSession se puede crear usando un patrón de constructor. El constructor reutilizará automáticamente el SparkSession existente si hay un SparkSession o creará un SparkSession si no existe. Durante las transacciones de E/S, la configuración del elemento de configuración en el generador se sincroniza automáticamente con Spark y Hadoop.

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()
  • SparkSession se puede usar para ejecutar consultas de SQL sobre datos y devolver resultados como DataFrame.
    sparkSession.sql("select * from person").show
  • SparkSession se puede utilizar para establecer elementos de configuración durante la ejecución. Estos elementos de configuración se pueden reemplazar con variables en las sentencias de SQL.
    sparkSession.conf.set("spark.some.config", "abcd")
    sparkSession.conf.get("spark.some.config")
    sparkSession.sql("select ${spark.some.config}")
  • SparkSession también incluye un método de "catalog" que contiene métodos para trabajar con Metastore (catálogo de datos). Después de utilizar este método, se devuelve un conjunto de datos, que se puede ejecutar utilizando la misma API de conjunto de datos.
    val tables = sparkSession.catalog.listTables()
    val columns = sparkSession.catalog.listColumns("myTable")
  • Se puede acceder a SparkContext subyacente mediante la API de SparkContext de SparkSession.
    val sparkContext = sparkSession.sparkContext

Principio de Structured Streaming

El Structured Streaming es un motor de procesamiento de flujo construido en el motor de Spark SQL. Puede usar la API Dataset/DataFrame en Scala, Java, Python o R para expresar agregaciones de streaming, ventanas de tiempo de eventos y stream-stream joins. Si los datos de streaming se producen de forma incremental y continua, Spark SQL continuará procesando los datos y sincronizando el resultado con el conjunto de resultados. Además, el sistema garantiza una tolerancia a fallas de extremo a extremo exactamente una vez a través de checkpoints y WALs.

El núcleo del Structured Streaming es tomar los datos de streaming como una tabla de base de datos incremental. De manera similar al modelo de procesamiento de bloques de datos, el modelo de procesamiento de datos de flujo continuo aplica operaciones de consulta en una tabla de base de datos estática a la computación de flujo continuo, y Spark usa sentencias SQL estándar para consulta, para obtener datos de la tabla incremental y no delimitada.
Figura 8 Tabla no delimitada de Structured Streaming

Cada operación de consulta generará una tabla de resultados. En cada intervalo de activación, los datos actualizados se sincronizarán con la tabla de resultados. Cada vez que se actualiza la tabla de resultados, el resultado actualizado se escribirá en un sistema de almacenamiento externo.

Figura 9 Modelo de procesamiento de datos de Structured Streaming

Los modos de almacenamiento de Structured Streaming en la fase de output son los siguientes:

  • Complete Mode: los conjuntos de resultados actualizados se escriben en el sistema de almacenamiento externo. La operación de escritura se realiza por un conector del sistema de almacenamiento externo.
  • Append Mode: Si se activa un intervalo, solo los datos agregados en la tabla de resultados se escribirán en un sistema externo. Esto solo se aplica a las consultas en las que no se espera que cambien las filas existentes en la tabla de resultados.
  • Update Mode: si se activa un intervalo, solo los datos actualizados en la tabla de resultados se escribirán en un sistema externo, que es la diferencia entre el Complete Mode y Update Mode.

Conceptos básicos

  • RDD

    El conjunto de datos distribuidos resilientes (RDD) es un concepto central de Spark. Indica un conjunto de datos distribuido de solo lectura y particionado. Los datos parciales o todos de este conjunto de datos pueden almacenarse en caché en la memoria y reutilizarse entre computaciones.

    Creación de RDD

    • Se puede crear un RDD a partir de la entrada de HDFS u otros sistemas de almacenamiento que sean compatibles con Hadoop.
    • Un nuevo RDD se puede convertir a partir de un RDD principal.
    • Un RDD se puede convertir a partir de una colección de conjuntos de datos a través de la codificación.

    Almacenamiento de RDD

    • Puede seleccionar diferentes niveles de almacenamiento para almacenar un RDD para su reutilización. (Hay 11 niveles de almacenamiento para almacenar un RDD.)
    • Por defecto, el RDD se almacena en la memoria. Cuando la memoria es insuficiente, el RDD se desborda al disco.
  • Dependencia de RDD

    La dependencia de RDD incluye la dependencia estrecha y la dependencia amplia.

    Figura 10 Dependencia de RDD
    • Dependencia estrecha: Cada partición del RDD principal es utilizada como máximo por una partición del RDD secundario.
    • Dependencia amplia: Las particiones del RDD secundaria dependen de todas las particiones del RDD principal.

    La estrecha dependencia facilita la optimización. Lógicamente, cada operador de RDD es un fork/join (el join no es el operador de join mencionado anteriormente, sino la barrier utilizada para sincronizar múltiples tareas simultáneas); fork el RDD a cada partición y, a continuación, realiza el cálculo. Después de la computación, join los resultados y, a continuación, realice la operación de fork/join en el siguiente operador de RDD. Es antieconómico traducir directamente el RDD en implementación física. El primero es que cada RDD (incluso resultado intermedio) necesita ser físico en memoria o almacenamiento, lo que consume mucho tiempo y ocupa mucho espacio. El segundo es que, como barrier global, la operación de join es muy costosa y todo el proceso de join se ralentizará por el nodo más lento. Si las particiones del RDD secundaria dependen estrechamente de la del RDD principal, los dos procesos fork/join se pueden combinar para implementar la optimización de fusión clásica. Si la relación en la secuencia continua del operador es una dependencia estrecha, se pueden combinar múltiples procesos de fork/join para reducir un gran número de barriers globales y eliminar la fisicalización de muchos resultados intermedios de RDD, lo que mejora enormemente el rendimiento. Esto se llama optimización de canalización en Spark.

  • Transformation y Action (Operaciones de RDD)

    Las operaciones en RDD incluyen transformation (el valor devuelto es un RDD) y action (el valor devuelto no es un RDD). Figura 11 muestra el proceso de operación de RDD. El transformation es lazy, lo que indica que la transformación de un RDD a otro RDD no se ejecuta inmediatamente. Spark solo registra la transformation, pero no la ejecuta inmediatamente. La computación real se inicia solo cuando se inicia la acción. El action devuelve los resultados o escribe los datos RDD en el sistema de almacenamiento. Action es la fuerza motriz para que Spark inicie la computación.

    Figura 11 Operación de RDD

    Los datos y el modelo de operación de RDD son bastante diferentes de los de Scala.

    val file = sc.textFile("hdfs://...")
    val errors = file.filter(_.contains("ERROR"))
    errors.cache()
    errors.count()
    1. El operador textFile lee los archivos de registro del HDFS y devuelve files (como un RDD).
    2. El operador de filtro filtra las filas con ERROR y las asigna a errors (un nuevo RDD). El operador de filter es una transformation.
    3. El operador de cache almacena en caché los errores para su uso futuro.
    4. El operador de count devuelve el número de filas de errores. El operador de count es una acción.
    Transformation incluye los siguientes tipos:
    • Los elementos RDD se consideran elementos simples.

      La entrada y la salida tienen la relación uno a uno, y la estructura de partición del RDD resultante permanece sin cambios, por ejemplo, map.

      La entrada y la salida tienen la relación uno-a-muchos, y la estructura de partición del resultado RDD permanece sin cambios, por ejemplo, flatMap (un elemento se convierte en una secuencia que contiene varios elementos después del mapa y, a continuación, se aplana a varios elementos).

      La entrada y la salida tienen la relación uno a uno, pero la estructura de partición del resultado RDD cambia, por ejemplo, union (dos RDD se integran a un RDD, y el número de particiones se convierte en la suma del número de particiones de dos RDD) y coalescen (las particiones se reducen).

      Los operadores de algunos elementos se seleccionan de la entrada, tales como filter, distinct (los elementos duplicados se eliminan), subtract (los elementos solo existen en este RDD se conservan) y sample (se toman muestras).

    • Los elementos RDD se consideran pares de clave-valor.

      Realizar el cálculo uno a uno en el único RDD, como mapValues (se conserva el modo de partición del RDD de origen, que es diferente del map).

      Ordenar el único RDD, como sort y partitionBy (partición con coherencia, lo que es importante para la optimización local).

      Reestructurar y reduce el RDD único basado en clave, como groupByKey y reduceByKey.

      Join y reestructurar dos RDD basados en la clave, como join y cogroup.

      Las tres operaciones posteriores que implican la clasificación se denominan operaciones de shuffle.

    Action incluye los siguientes tipos:

    • Genere elementos de configuración escalar, como count (el número de elementos en el RDD devuelto), reduce, fold/aggregate (el número de elementos de configuración escalar que se devuelven) y take (el número de elementos antes del retorno).
    • Generar la colección Scala, como collect (importar todos los elementos del RDD a la colección Scala) y lookup (buscar todos los valores corresponden a la clave).
    • Escribir datos en el almacenamiento, como saveAsTextFile (que corresponde al textFile anterior).
    • Puntos de comprobación, como el operador checkpoint. Cuando el lineage es bastante largo (lo que ocurre con frecuencia en el cálculo de gráficos), se necesita un largo período de tiempo para ejecutar toda la secuencia de nuevo cuando se produce un falla. En este caso, checkpoint se utiliza como el punto de control para escribir los datos actuales en el almacenamiento estable.
  • Shuffle

    La mezcla aleatoria es una fase específica en el marco de MapReduce, que se encuentra entre la fase Map y la fase Reduce. Si los resultados de salida de Map van a ser utilizados por Reduce, los resultados de salida deben ser hashed basados en una clave y distribuidos a cada Reducer. Este proceso se llama Shuffle. Shuffle implica la lectura y escritura del disco y la transmisión de la red, de modo que el rendimiento de Shuffle afecta directamente a la eficiencia de operación de todo el programa.

    La siguiente figura muestra todo el proceso del algoritmo de MapReduce.

    Figura 12 Proceso de algoritmo

    Shuffle es un puente para conectar datos. A continuación se describe la implementación de shuffle en Spark.

    Shuffle divide un job de Spark en múltiples stages. Las primeras etapas contienen uno o más ShuffleMapTasks y la última etapa contiene uno o más ResultTasks.

  • Estructura de Spark Application

    La estructura de aplicación Spark incluye el SparkContext inicializado y el programa principal.

    • SparkContext inicializado: construye el entorno operativo de la aplicación Spark.

      Construye el objeto de SparkContext. A continuación se presenta un ejemplo:

      new SparkContext(master, appName, [SparkHome], [jars])

      Descripción de parámetros:

      master indica la cadena de enlace. Los modos de enlace incluyen local, Yarn-cluster y Yarn-cliente.

      appName indica el nombre de la aplicación.

      SparkHome indica el directorio donde está instalado Spark en el clúster.

      jars indica el código y el paquete de dependencias de una aplicación.

    • Programa principal: procesa datos.

    Para obtener más información sobre cómo presentar una solicitud, visite https://spark.apache.org/docs/3.1.1/submitting-applications.html.

  • Comandos de Spark Shell

    Los comandos básicos de Spark shell soportan el envío de aplicaciones de Spark. Los comandos de shell Spark son los siguientes:

    ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      ... # other options
      <application-jar> \
      [application-arguments]

    Descripción de parámetros:

    --class: indica el nombre de la clase de un Spark application.

    --master: indica el patrón al que se vincula la aplicación de Spark, como Yarn-client y Yarn-cluster.

    application-jar: indica la ruta del archivo JAR de Spark application.

    application-arguments: indica el parámetro necesario para enviar Spark application. Este parámetro se puede dejar en blanco.

  • Spark JobHistory Server

    La interfaz de usuario web de Spark se utiliza para monitorear los detalles en cada fase de marco de Spark de un trabajo de Spark en ejecución o histórico y proporcionar la visualización del registro, lo que ayuda a los usuarios a desarrollar, configurar y optimizar el trabajo en unidades más finas.