Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Actualización más reciente 2023-04-14 GMT+08:00

Principios básicos de Spark2x

El componente de Spark2x se aplica a MRS 3.x y versiones posteriores.

Descripción

Spark es un marco de computación distribuida basado en memoria. En escenarios de computación iterativos, la capacidad de computación de Spark es de 10 a 100 veces mayor que MapReduce porque los datos se almacenan en memoria cuando se procesan. Spark puede utilizar HDFS como sistema de almacenamiento subyacente, lo que permite a los usuarios cambiar rápidamente a Spark desde MapReduce. Spark ofrece capacidades de análisis de datos integrales, como el procesamiento de streaming en lotes pequeños, procesamiento por lotes sin conexión, consultas de SQL y minería de datos. Los usuarios pueden utilizar estas funciones sin problemas en una misma aplicación. Para obtener más información sobre las nuevas características de código abierto de Spark2x, consulte Nuevas funciones de código abierto de Spark2x.

Las características de Spark son las siguientes:

  • Mejora la capacidad de procesamiento de datos a través de la computación de memoria distribuida y el motor de ejecución de gráficos acíclicos dirigidos (DAG). El rendimiento entregado es de 10 a 100 veces mayor que el de MapReduce.
  • Soporta múltiples lenguajes de desarrollo (Scala/Java/Python) y docenas de operadores altamente abstractos para facilitar la construcción de aplicaciones de procesamiento de datos distribuidos.
  • Crea pilas de procesamiento de datos usando SQL, Streaming, MLlib y GraphX para proporcionar capacidades de procesamiento de datos de una sola parada.
  • Se adapta al ecosistema de Hadoop, lo que permite que las aplicaciones de Spark se ejecuten en Standalone, Mesos o Yarn, lo que permite el acceso a múltiples fuentes de datos como HDFS, HBase y Hive, y admite la migración sin problemas de la aplicación MapReduce a Spark.

Arquitectura

Figura 1 describe la arquitectura de Spark y Tabla 1 enumera los módulos de Spark.

Figura 1 Arquitectura de Spark
Tabla 1 Conceptos básicos

Módulo

Descripción

Cluster Manager

El administrador de clústeres gestiona los recursos del clúster. Spark admite varios administradores de clústeres, incluidos Mesos, Yarn y el administrador de clústeres independiente que se entrega con Spark. De forma predeterminada, los clústeres de Spark adoptan el administrador de clústeres de Yarn.

Application

Aplicación de Spark. Consiste en un programa de Driver Program y múltiples executors.

Deploy Mode

Implementación en modo de clúster o cliente. En modo de clúster, el controlador se ejecuta en un nodo dentro del clúster. En modo cliente, el controlador se ejecuta en el cliente (fuera del clúster).

Driver Program

El proceso principal de la aplicación Spark. Ejecuta la función main() de una aplicación y crea "SparkContext". Se utiliza para analizar aplicaciones, generar etapas y programar tareas a ejecutores. Por lo general, SparkContext representa Driver Program.

Executor

Un proceso iniciado en un Work Node. Se utiliza para ejecutar tareas, y gestionar y procesar los datos utilizados en las aplicaciones. Una aplicación de Spark generalmente contiene varios executors. Cada executor recibe comandos del driver y ejecuta una o varias tareas.

Worker Node

Nodo que inicia y gestiona ejecutores y recursos en un clúster.

Job

Un job consta de varias tareas simultáneas. Un operador de action (por ejemplo, un operador de collect) se asigna a un job.

Stage

Cada job consta de múltiples etapas. Cada etapa es un conjunto de tareas, que está separado por el Gráfico Acíclico Dirigido (DAG).

Task

Una tarea lleva la unidad de cálculo de la lógica de servicio. Es la unidad de trabajo mínima que se puede ejecutar en la plataforma de Spark. Una aplicación se puede dividir en múltiples tareas basadas en el plan de ejecución y la cantidad de cálculo.

Principio de Spark

Figura 2 describe la arquitectura de Spark en ejecución de aplicaciones.

  1. Una aplicación se está ejecutando en el clúster como una colección de procesos. El controlador coordina la ejecución de la aplicación.
  2. Para ejecutar una aplicación, el controlador se conecta al administrador de clústeres (como Standalone, Mesos y Yarn) para solicitar los recursos del ejecutor e iniciar ExecutorBackend. El administrador de clúster programa los recursos entre diferentes aplicaciones. El controlador programa los DAGs, divide las etapas y genera tareas para la aplicación al mismo tiempo.
  3. A continuación, Spark envía los códigos de la aplicación (los códigos transferidos a SparkContext definidos por JAR o Python) a executor.
  4. Una vez terminadas todas las tareas, se detiene la ejecución de la aplicación de usuario.
Figura 2 Arquitectura de ejecución de aplicaciones de Spark

Spark utiliza los modos de Master y Worker, como se muestra en Figura 3. Un usuario envía una aplicación en el cliente de Spark y, a continuación, el programador divide un job en varias tareas y envía las tareas a cada Worker para su ejecución. Cada Worker informa los resultados del cálculo al Driver (Master) y, a continuación, el Driver agrega y devuelve los resultados al cliente.

Figura 3 Modo de Spark Master-Worker

Tenga en cuenta lo siguiente acerca de la arquitectura:

  • Las aplicaciones se aíslan entre sí.

    Cada aplicación tiene un proceso de executor independiente, y cada executor inicia varios subprocesos para ejecutar tareas en paralelo. Cada driver programa sus propias tareas, y diferentes tareas de aplicación se ejecutan en diferentes JVM, es decir, diferentes executors.

  • Las diferentes aplicaciones de Spark no comparten datos, a menos que los datos se almacenen en el sistema de almacenamiento externo, como HDFS.
  • Se recomienda desplegar el programa de Driver en una ubicación cercana al nodo de Worker porque el programa de Driver programa tareas en el clúster. Por ejemplo, desplegar el programa de Driver en la red donde se encuentra el nodo de Worker.

Spark en YARN se puede desplegar en dos modos:

  • En el modo de Yarn-cluster, el driver de Spark se ejecuta dentro de un proceso de ApplicationMaster que es getionado por Yarn en el clúster. Después de iniciar el ApplicationMaster, el cliente puede salir sin interrumpir la ejecución del servicio.
  • En el modo de Yarn-client, el Driver se ejecuta en el proceso de cliente y el proceso de ApplicationMaster solo se utiliza para solicitar recursos de Yarn.

Principio de Spark Streaming

Spark Streaming es un marco informático en tiempo real basado en Spark, que amplía la capacidad para procesar datos de streaming masivos. Spark admite dos enfoques de procesamiento de datos: Direct Streaming y Receiver.

Proceso de computación de Direct Streaming

En el enfoque de Direct Streaming, Direct API se utiliza para procesar datos. Tomemos Kafka Direct API como ejemplo. Direct API proporciona una ubicación de desplazamiento desde la que cada rango de batch leerá, lo que es mucho más simple que iniciar un receptor para recibir continuamente datos de Kafka y datos escritos en registros de escritura previa (WAL). A continuación, cada job de batch se está ejecutando y los datos de desplazamiento correspondientes están listos en Kafka. Esta información de desplazamiento puede almacenarse de forma segura en el archivo de punto de control y leerse por aplicaciones que no se iniciaron.

Figura 4 Transmisión de datos a través de Direct Kafka API

Después de la falla, Spark Streaming puede leer datos de Kafka de nuevo y procesar el segmento de datos. El resultado del procesamiento es el mismo sin importar que Spark Streaming falle o no, porque la semántica se procesa solo una vez.

Direct API no necesita usar el WAL y Receivers, y se asegura de que cada registro de Kafka se reciba solo una vez, lo que es más eficiente. De esta manera, Spark Streaming y Kafka se pueden integrar bien, haciendo que los canales de streaming se presenten con alta tolerancia a fallas, alta eficiencia y facilidad de uso. Por lo tanto, se recomienda utilizar Direct Streaming para procesar datos.

Proceso de computación de Receiver

Cuando se inicia una aplicación de Spark Streaming (es decir, cuando se inicia el driver), el StreamingContext relacionado (la base de todas las funciones de streaming) utiliza SparkContext para iniciar el receiver y convertirse en una tarea en ejecución a largo plazo. Estos receivers reciben y guardan datos de transmisió n en la memoria de Spark para su procesamiento. Figura 5 muestra el ciclo de vida de la transferencia de datos.

Figura 5 Ciclo de vida de la transferencia de datos
  1. Recibir datos (flecha azul).

    Receiver divide un flujo de datos en una serie de bloques y los almacena en la memoria del executor. Además, después de habilitar WAL, escribe datos en el WAL del sistema de archivos tolerante a fallas.

  2. Notificar al driver (flecha verde).

    Los metadatos en el bloque recibido se envían a StreamingContext en el driver. Los metadatos incluyen:

    • ID de referencia de bloque utilizado para localizar la posición de los datos en la memoria del Executor.
    • Información de desplazamiento de datos de bloque en registros (si la función WAL está habilitada).
  3. Procesar datos (flecha roja).

    Para cada lote de datos, StreamingContext utiliza información de bloque para generar conjuntos de datos distribuidos resilientes (RDD) y jobs. StreamingContext ejecuta jobs ejecutando tareas para procesar bloques en la memoria del executor.

  4. Establecer periódicamente puntos de control (flechas naranjas).
  5. Para la tolerancia a fallas, StreamingContext establece periódicamente puntos de comprobación y los guarda en sistemas de archivos externos.

Tolerancia a fallas

Spark y su RDD permiten procesar sin problemas las fallas de cualquier nodo de Worker en el clúster. Spark Streaming está construido sobre Spark. Por lo tanto, el nodo Worker de Spark Streaming también tiene la misma capacidad de tolerancia a fallas. Sin embargo, Spark Streaming necesita funcionar correctamente en caso de que se ejecute durante mucho tiempo. Por lo tanto, Spark debe ser capaz de recuperarse de fallas a través del proceso del driver (proceso principal que coordina a todos los Workers). Esto plantea desafíos a la tolerancia a fallas del Spark driver debido a que el Spark driver puede ser cualquier aplicación de usuario implementada en cualquier modo de computación. Sin embargo, Spark Streaming tiene una arquitectura de computación interna. Es decir, ejecuta periódicamente el mismo computación de Spark en cada lote de datos. Dicha arquitectura le permite almacenar periódicamente puntos de control en un espacio de almacenamiento confiable y recuperarlos al reiniciar el Driver.

Para los datos de origen tales como archivos, el mecanismo de recuperación del Driver puede garantizar cero pérdida de datos porque todos los datos se almacenan en un sistema de archivos tolerante a fallas como HDFS. Sin embargo, para otras fuentes de datos tales como Kafka y Flume, algunos datos recibidos se almacenan en caché solo en la memoria y pueden perderse antes de ser procesados. Esto es causado por el modo de operación de distribución de las aplicaciones de Spark. Cuando el proceso del driver falla, todos los executors que se ejecutan en el Cluster Manager, junto con todos los datos de la memoria, se terminan. Para evitar dicha pérdida de datos, la función WAL se agrega a Spark Streaming.

WAL se utiliza a menudo en bases de datos y sistemas de archivos para garantizar la persistencia de cualquier operación de datos. Es decir, primero registrar una operación en un registro persistente y realizar esta operación en los datos. Si la operación falla, el sistema se recupera leyendo el registro y volviendo a aplicar la operación preestablecida. A continuación se describe cómo utilizar WAL para garantizar la persistencia de los datos recibidos:

Receiver se utiliza para recibir datos de fuentes de datos tales como Kafka. Como una tarea que se ejecuta durante mucho tiempo en Executor, Receiver recibe datos y también confirma los datos recibidos si son compatibles con los orígenes de datos. Los datos recibidos se almacenan en la memoria de Executor, y Driver entrega una tarea al Executor para su procesamiento.

Después de habilitar WAL, todos los datos recibidos se almacenan en archivos de registro en el sistema de archivos tolerante a fallas. Por lo tanto, los datos recibidos no pierden incluso si Spark Streaming falla. Además, el receiver comprueba la exactitud de los datos recibidos solo después de que los datos se hayan escrito previamente en registros. Los datos almacenados en caché pero no almacenados pueden ser enviados de nuevo por los orígenes de datos después de que el driver se reinicie. Estos dos mecanismos aseguran cero pérdida de datos. Es decir, todos los datos se recuperan de los registros o se reenvían por las fuentes de datos.

Para habilitar la función WAL, realice las siguientes operaciones:

  • Establezca streamingContext.checkpoint (ruta al directorio) para configurar el directorio de checkpoint, que es una ruta de archivo de HDFS utilizada para almacenar checkpoints y WAL de streaming.
  • Establezca spark.streaming.receiver.writeAheadLog.enable de SparkConf a true (el valor predeterminado es de false).

Después de que WAL está habilitado, todos los receivers tienen la ventaja de recuperarse de datos recibidos confiables. Se recomienda desactivar el mecanismo de réplica múltiple porque el sistema de archivos tolerante a errores de WAL también puede replicar los datos.

El rendimiento de recepción de datos se reduce después de que se habilita WAL. Todos los datos se escriben en el sistema de archivos tolerante a fallas. Como resultado, el rendimiento de escritura del sistema de archivos y el ancho de banda de red para la replicación de datos puede convertirse en el cuello de botella potencial. Para resolver este problema, se recomienda crear más receptores para aumentar el grado de paralelismo de recepción de datos o utilizar mejor hardware para mejorar el rendimiento del sistema de archivos tolerante a fallas.

Proceso de recuperación

Cuando se reinicia un driver fallido, reinícielo de la siguiente manera:
Figura 6 Proceso de recuperación de computación
  1. Recuperar la computación. (flecha naranja)

    Utilice la información del punto de control para reiniciar Driver, reconstruir el SparkContext y reinicia Receiver.

  2. Recuperar el bloque de metadatos. (flecha verde)

    Esta operación asegura que todos los bloques de metadatos necesarios se recuperan para continuar con la recuperación informática posterior.

  3. Relanzar trabajos inacabados. (flecha roja)

    Los metadatos recuperados se utilizan para generar RDD y trabajos correspondientes para el procesamiento por lotes interrumpido debido a fallas.

  4. Leer los datos del bloque guardados en los registros. (flecha azul)

    Los datos de bloque se leen directamente de los WAL durante la ejecución de los trabajos anteriores y, por lo tanto, se recuperan todos los datos esenciales almacenados de forma confiable en los registros.

  5. Reenviar datos no confirmados. (flecha púrpura)

    Los datos que se almacenan en caché pero no se almacenan en los registros en caso de fallas son reenviados por las fuentes de datos, porque el receiver no confirma los datos.

Por lo tanto, mediante el uso de WALs y Receiver confiable, Spark Streaming puede evitar la pérdida de datos de entrada causada por fallas de Driver.

Principio de SparkSQL y DataSet

SparkSQL

Figura 7 SparkSQL y DataSet

Spark SQL es un módulo para procesar datos estructurados. En la aplicación de Spark, las sentencias de SQL o las API de DataSet se pueden usar sin problemas para consultar datos estructurados.

Spark SQL y DataSet también proporcionan un método universal para acceder a múltiples fuentes de datos como Hive, CSV, Parquet, ORC, JSON y JDBC. Estas fuentes de datos también permiten la interacción de datos. Spark SQL reutiliza la lógica de procesamiento de frontend Hive y el módulo de procesamiento de metadatos. Con Spark SQL, puede consultar directamente los datos de Hive existentes.

Además, Spark SQL también proporciona API, CLI y API JDBC, lo que permite diversos accesos al cliente.

Spark SQL Native DDL/DML

En Spark 1.5, muchos comandos de Lenguaje de definición de datos (DDL)/Lenguaje de manipulación de datos (DML) se presionan hacia abajo y se ejecutan en Hive, causando acoplamiento con Hive e inflexibilidad, como informes de errores inesperados y resultados.

Spark2x realiza la localización de comandos y reemplaza Hive con Spark SQL Native DDL/DML para ejecutar comandos de DDL/DML. Además, se realiza el desacoplamiento de Hive y se pueden personalizar los comandos.

DataSet

Un DataSet es una colección fuertemente tipada de objetos específicos de dominio que se pueden transformar en paralelo usando operaciones funcionales o relacionales. Cada Dataset también tiene una vista no escrita llamada DataFrame, que es un Dataset de Row.

El DataFrame es un conjunto de datos estructurado y distribuido que consta de varias columnas. El DataFrame es igual a una tabla en la base de datos de relaciones o al DataFrame en R/Python. El DataFrame es el concepto más básico en Spark SQL, que se puede crear mediante múltiples métodos, como el conjunto de datos estructurado, la tabla Hive, la base de datos externa o RDD.

Las operaciones disponibles en el DataSets se dividen en transformaciones y acciones.

  • Una operación de transformation puede generar un nuevo DataSet,

    por ejemplo map, filter, select y aggregate (groupBy).

  • Una operación de action puede desencadenar el cálculo y devolver resultados,

    por ejemplo count, show o escribir datos en el sistema de archivos.

Puede utilizar cualquiera de los siguientes métodos para crear un DataSet:

  • La forma más común es apuntar Spark a algunos archivos en sistemas de almacenamiento, usando la función read disponible en un SparkSession.
    val people = spark.read.parquet("...").as[Person]  // Scala
    DataSet<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class));//Java
  • También puede crear un DataSet mediante la operación de transformation disponible en una existente. Por ejemplo, aplique la operación de map en un DataSet existente para crear un DataSet:
    val names = people.map(_.name) // In Scala: names is Dataset.
    Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

CLI y JDBCServer

Además de las API de programación, Spark SQL también proporciona las API de CLI/JDBC.

  • Los scripts spark-shell y spark-sql pueden proporcionar la CLI para la depuración.
  • JDBCServer proporciona API de JDBC. Los sistemas externos pueden enviar directamente solicitudes JDBC para calcular y analizar datos estructurados.

Principio de SparkSession

SparkSession es una API unificada en Spark2x y se puede considerar como una entrada unificada para la lectura de datos. SparkSession proporciona un único punto de entrada para realizar muchas operaciones que anteriormente estaban dispersas entre varias clases, y también proporciona métodos de acceso a estas clases más antiguas para maximizar la compatibilidad.

Un SparkSession se puede crear usando un patrón de constructor. El constructor reutilizará automáticamente el SparkSession existente si hay un SparkSession o creará un SparkSession si no existe. Durante las transacciones de E/S, la configuración del elemento de configuración en el generador se sincroniza automáticamente con Spark y Hadoop.

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()
  • SparkSession se puede usar para ejecutar consultas de SQL sobre datos y devolver resultados como DataFrame.
    sparkSession.sql("select * from person").show
  • SparkSession se puede utilizar para establecer elementos de configuración durante la ejecución. Estos elementos de configuración se pueden reemplazar con variables en las sentencias de SQL.
    sparkSession.conf.set("spark.some.config", "abcd")
    sparkSession.conf.get("spark.some.config")
    sparkSession.sql("select ${spark.some.config}")
  • SparkSession también incluye un método de "catalog" que contiene métodos para trabajar con Metastore (catálogo de datos). Después de utilizar este método, se devuelve un conjunto de datos, que se puede ejecutar utilizando la misma API de conjunto de datos.
    val tables = sparkSession.catalog.listTables()
    val columns = sparkSession.catalog.listColumns("myTable")
  • Se puede acceder a SparkContext subyacente mediante la API de SparkContext de SparkSession.
    val sparkContext = sparkSession.sparkContext

Principio de Structured Streaming

El Structured Streaming es un motor de procesamiento de flujo construido en el motor de Spark SQL. Puede usar la API Dataset/DataFrame en Scala, Java, Python o R para expresar agregaciones de streaming, ventanas de tiempo de eventos y stream-stream joins. Si los datos de streaming se producen de forma incremental y continua, Spark SQL continuará procesando los datos y sincronizando el resultado con el conjunto de resultados. Además, el sistema garantiza una tolerancia a fallas de extremo a extremo exactamente una vez a través de checkpoints y WALs.

El núcleo del Structured Streaming es tomar los datos de streaming como una tabla de base de datos incremental. De manera similar al modelo de procesamiento de bloques de datos, el modelo de procesamiento de datos de flujo continuo aplica operaciones de consulta en una tabla de base de datos estática a la computación de flujo continuo, y Spark usa sentencias SQL estándar para consulta, para obtener datos de la tabla incremental y no delimitada.
Figura 8 Tabla no delimitada de Structured Streaming

Cada operación de consulta generará una tabla de resultados. En cada intervalo de activación, los datos actualizados se sincronizarán con la tabla de resultados. Cada vez que se actualiza la tabla de resultados, el resultado actualizado se escribirá en un sistema de almacenamiento externo.

Figura 9 Modelo de procesamiento de datos de Structured Streaming

Los modos de almacenamiento de Structured Streaming en la fase de output son los siguientes:

  • Complete Mode: los conjuntos de resultados actualizados se escriben en el sistema de almacenamiento externo. La operación de escritura se realiza por un conector del sistema de almacenamiento externo.
  • Append Mode: Si se activa un intervalo, solo los datos agregados en la tabla de resultados se escribirán en un sistema externo. Esto solo se aplica a las consultas en las que no se espera que cambien las filas existentes en la tabla de resultados.
  • Update Mode: si se activa un intervalo, solo los datos actualizados en la tabla de resultados se escribirán en un sistema externo, que es la diferencia entre el Complete Mode y Update Mode.

Conceptos

  • RDD

    El conjunto de datos distribuidos resilientes (RDD) es un concepto central de Spark. Indica un conjunto de datos distribuido de solo lectura y particionado. Los datos parciales o todos de este conjunto de datos pueden almacenarse en caché en la memoria y reutilizarse entre computaciones.

    Creación de RDD

    • Se puede crear un RDD a partir de la entrada de HDFS u otros sistemas de almacenamiento que sean compatibles con Hadoop.
    • Un nuevo RDD se puede convertir a partir de un RDD principal.
    • Un RDD se puede convertir a partir de una colección de conjuntos de datos a través de la codificación.

    Almacenamiento de RDD

    • Puede seleccionar diferentes niveles de almacenamiento para almacenar un RDD para su reutilización. (Hay 11 niveles de almacenamiento para almacenar un RDD.)
    • Por defecto, el RDD se almacena en la memoria. Cuando la memoria es insuficiente, el RDD se desborda al disco.
  • Dependencia de RDD

    La dependencia de RDD incluye la dependencia estrecha y la dependencia amplia.

    Figura 10 Dependencia de RDD
    • Dependencia estrecha: Cada partición del RDD principal es utilizada como máximo por una partición del RDD secundario.
    • Dependencia amplia: Las particiones del RDD secundaria dependen de todas las particiones del RDD principal.

    La estrecha dependencia facilita la optimización. Lógicamente, cada operador de RDD es un fork/join (el join no es el operador de join mencionado anteriormente, sino la barrier utilizada para sincronizar múltiples tareas simultáneas); fork el RDD a cada partición y, a continuación, realiza el cálculo. Después de la computación, join los resultados y, a continuación, realice la operación de fork/join en el siguiente operador de RDD. Es antieconómico traducir directamente el RDD en implementación física. El primero es que cada RDD (incluso resultado intermedio) necesita ser físico en memoria o almacenamiento, lo que consume mucho tiempo y ocupa mucho espacio. El segundo es que, como barrier global, la operación de join es muy costosa y todo el proceso de join se ralentizará por el nodo más lento. Si las particiones del RDD secundaria dependen estrechamente de la del RDD principal, los dos procesos fork/join se pueden combinar para implementar la optimización de fusión clásica. Si la relación en la secuencia continua del operador es una dependencia estrecha, se pueden combinar múltiples procesos de fork/join para reducir un gran número de barriers globales y eliminar la fisicalización de muchos resultados intermedios de RDD, lo que mejora enormemente el rendimiento. Esto se llama optimización de canalización en Spark.

  • Transformation y Action (Operaciones de RDD)

    Las operaciones en RDD incluyen transformation (el valor devuelto es un RDD) y action (el valor devuelto no es un RDD). Figura 11 muestra el proceso de operación de RDD. El transformation es lazy, lo que indica que la transformación de un RDD a otro RDD no se ejecuta inmediatamente. Spark solo registra la transformation, pero no la ejecuta inmediatamente. La computación real se inicia solo cuando se inicia la acción. El action devuelve los resultados o escribe los datos RDD en el sistema de almacenamiento. Action es la fuerza motriz para que Spark inicie la computación.

    Figura 11 Operación de RDD

    Los datos y el modelo de operación de RDD son bastante diferentes de los de Scala.

    val file = sc.textFile("hdfs://...")
    val errors = file.filter(_.contains("ERROR"))
    errors.cache()
    errors.count()
    1. El operador textFile lee los archivos de registro del HDFS y devuelve files (como un RDD).
    2. El operador de filtro filtra las filas con ERROR y las asigna a errors (un nuevo RDD). El operador de filter es una transformation.
    3. El operador de cache almacena en caché los errores para su uso futuro.
    4. El operador de count devuelve el número de filas de errores. El operador de count es una acción.
    Transformation incluye los siguientes tipos:
    • Los elementos RDD se consideran elementos simples.

      La entrada y la salida tienen la relación uno a uno, y la estructura de partición del RDD resultante permanece sin cambios, por ejemplo, map.

      La entrada y la salida tienen la relación uno-a-muchos, y la estructura de partición del resultado RDD permanece sin cambios, por ejemplo, flatMap (un elemento se convierte en una secuencia que contiene varios elementos después del mapa y, a continuación, se aplana a varios elementos).

      La entrada y la salida tienen la relación uno a uno, pero la estructura de partición del resultado RDD cambia, por ejemplo, union (dos RDD se integran a un RDD, y el número de particiones se convierte en la suma del número de particiones de dos RDD) y coalescen (las particiones se reducen).

      Los operadores de algunos elementos se seleccionan de la entrada, tales como filter, distinct (los elementos duplicados se eliminan), subtract (los elementos solo existen en este RDD se conservan) y sample (se toman muestras).

    • Los elementos RDD se consideran pares de clave-valor.

      Realizar el cálculo uno a uno en el único RDD, como mapValues (se conserva el modo de partición del RDD de origen, que es diferente del map).

      Ordenar el único RDD, como sort y partitionBy (partición con coherencia, lo que es importante para la optimización local).

      Reestructurar y reduce el RDD único basado en clave, como groupByKey y reduceByKey.

      Join y reestructurar dos RDD basados en la clave, como join y cogroup.

      Las tres operaciones posteriores que implican la clasificación se denominan operaciones de shuffle.

    Action incluye los siguientes tipos:

    • Genere elementos de configuración escalar, como count (el número de elementos en el RDD devuelto), reduce, fold/aggregate (el número de elementos de configuración escalar que se devuelven) y take (el número de elementos antes del retorno).
    • Generar la colección Scala, como collect (importar todos los elementos del RDD a la colección Scala) y lookup (buscar todos los valores corresponden a la clave).
    • Escribir datos en el almacenamiento, como saveAsTextFile (que corresponde al textFile anterior).
    • Puntos de comprobación, como el operador checkpoint. Cuando el lineage es bastante largo (lo que ocurre con frecuencia en el cálculo de gráficos), se necesita un largo período de tiempo para ejecutar toda la secuencia de nuevo cuando se produce un falla. En este caso, checkpoint se utiliza como el punto de control para escribir los datos actuales en el almacenamiento estable.
  • Shuffle

    La mezcla aleatoria es una fase específica en el marco de MapReduce, que se encuentra entre la fase Map y la fase Reduce. Si los resultados de salida de Map van a ser utilizados por Reduce, los resultados de salida deben ser hashed basados en una clave y distribuidos a cada Reducer. Este proceso se llama Shuffle. Shuffle implica la lectura y escritura del disco y la transmisión de la red, de modo que el rendimiento de Shuffle afecta directamente a la eficiencia de operación de todo el programa.

    La siguiente figura muestra todo el proceso del algoritmo de MapReduce.

    Figura 12 Proceso de algoritmo

    Shuffle es un puente para conectar datos. A continuación se describe la implementación de shuffle en Spark.

    Shuffle divide un job de Spark en múltiples stages. Las primeras etapas contienen uno o más ShuffleMapTasks y la última stage contiene uno o más ResultTasks.

  • Estructura de Spark Application

    La estructura de aplicación Spark incluye el SparkContext inicializado y el programa principal.

    • SparkContext inicializado: construye el entorno operativo de la aplicación Spark.

      Construye el objeto de SparkContext. A continuación se presenta un ejemplo:

      new SparkContext(master, appName, [SparkHome], [jars])

      Descripción de parámetros:

      master indica la cadena de enlace. Los modos de enlace incluyen local, Yarn-cluster y Yarn-cliente.

      appName indica el nombre de la aplicación.

      SparkHome indica el directorio donde está instalado Spark en el clúster.

      jars indica el código y el paquete de dependencias de una aplicación.

    • Programa principal: procesa datos.

    Para obtener más información sobre cómo presentar una solicitud, visite https://spark.apache.org/docs/3.1.1/submitting-applications.html.

  • Comandos de Spark Shell

    Los comandos básicos de Spark shell soportan el envío de aplicaciones de Spark. Los comandos de shell Spark son los siguientes:

    ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      ... # other options
      <application-jar> \
      [application-arguments]

    Descripción de parámetros:

    --class: indica el nombre de la clase de un Spark application.

    --master: indica el patrón al que se vincula la aplicación de Spark, como Yarn-client y Yarn-cluster.

    application-jar: indica la ruta del archivo JAR de Spark application.

    application-arguments: indica el parámetro necesario para enviar Spark application. Este parámetro se puede dejar en blanco.

  • Spark JobHistory Server

    La interfaz de usuario web de Spark se utiliza para monitorear los detalles en cada fase de marco de Spark de un trabajo de Spark en ejecución o histórico y proporcionar la visualización del registro, lo que ayuda a los usuarios a desarrollar, configurar y optimizar el trabajo en unidades más finas.