Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Cómputo
Elastic Cloud Server
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Redes
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Gestión y gobernanza
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
Cloud Operations Center
Resource Governance Center
Migración
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Análisis
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
IoT
IoT Device Access
Otros
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Seguridad y cumplimiento
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Blockchain
Blockchain Service
Servicios multimedia
Media Processing Center
Video On Demand
Live
SparkRTC
Almacenamiento
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Contenedores
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Bases de datos
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Aplicaciones empresariales
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Data Lake Factory
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Distribución de contenido y cómputo de borde
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Soluciones
SAP Cloud
High Performance Computing
Servicios para desarrolladores
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
Cloud Application Engine
aPaaS MacroVerse
KooPhone
KooDrive

Desarrollo de aplicaciones de Spark

Actualización más reciente 2023-11-20 GMT+08:00
Spark es un marco de procesamiento por lotes distribuido. Proporciona capacidades de análisis y minería y computación de memoria iterativa y soporta el desarrollo de aplicaciones en múltiples lenguajes de programación. Se aplica a los siguientes escenarios:
  • Procesamiento de datos: Spark puede procesar datos rápidamente y tiene tolerancia a fallos y escalabilidad.
  • Cálculo iterativo: Spark admite el cálculo iterativo para mantenerse al día con la lógica de procesamiento de datos de varios pasos.
  • Minería de datos: Basada en datos masivos, Spark puede manejar la minería y el análisis de datos complejos y admite múltiples algoritmos de minería de datos y aprendizaje automático.
  • Procesamiento de streaming: Spark admite el procesamiento de streaming con solo una latencia de nivel de segundos y admite múltiples fuentes de datos externas.
  • Análisis de consultas: Spark admite el análisis de consultas SQL estándar, proporciona el DSL (DataFrame) y admite múltiples entradas externas.

MRS proporciona ejemplos de proyectos de desarrollo de aplicaciones basados en Spark. Esta práctica proporciona orientación para que obtenga e importe un proyecto de muestra después de crear un clúster MRS y, a continuación, realice la construcción y puesta en marcha localmente. En este proyecto de ejemplo, puede leer datos de tablas Hive y volver a escribir los datos en tablas HBase.

Las directrices para el proyecto de muestra en esta práctica son las siguientes:

  1. Consultar datos en una tabla Hive especificada.
  2. Consultar datos en una tabla HBase especificada basándose en la clave de los datos en la tabla Hive.
  3. Agregar registros de datos relacionados y escribirlos en la tabla HBase.

Creación de un clúster de MRS

  1. Cree y compre un clúster MRS que contenga Spark. Para obtener más información, consulte Compra de un clúster personalizado.

    NOTA:

    En esta práctica, se utiliza como ejemplo un clúster MRS 3.1.5, con Spark2x, Hive y HBase instalados y con la autenticación Kerberos habilitada.

  2. Después de comprar el clúster, instale el cliente en cualquier nodo del clúster. Para obtener más información, consulte Instalación y uso de cliente de clúster.

    Por ejemplo, instale el cliente en el directorio /opt/client en el nodo de gestión activo.

Preparación del archivo de configuración de clúster

  1. Una vez creado el clúster, inicie sesión en FusionInsight Manager y cree un usuario del clúster para enviar trabajos de Flink.

    Elija System > Permission > User. En el panel derecho, haga clic en Create. En la página mostrada, cree un usuario hombre-máquina, por ejemplo, sparkuser.

    Agregue el grupo de usuarios supergroup y asocie el rol System_administrator.

  2. Inicie sesión en FusionInsight Manager como nuevo usuario y cambie la contraseña inicial según se le solicite.
  3. Elija System > Permission > User. En la columna Operation de sparkuser, elija More > Download Authentication Credential. Guarde el archivo y descomprímalo para obtener los archivos user.keytab y krb5.conf del usuario.

Desarrollo de la aplicación

  1. Obtenga el proyecto de muestra de Huawei Mirrors.

    Descargue el código fuente del proyecto Maven y los archivos de configuración del proyecto de ejemplo, y configure las herramientas de desarrollo relacionadas en su PC local. Para obtener más información, consulte Obtención de proyectos de muestra desde Huawei Mirros.

    Seleccione una rama basada en la versión del clúster y descargue el proyecto de muestra de MRS requerido.

    Por ejemplo, el proyecto de muestra adecuado para esta práctica es SparkHivetoHbase, que se puede obtener en https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5/src/spark-examples/sparksecurity-examples/SparkHivetoHbaseJavaExample.

  2. Utilice IDEA para importar el proyecto de ejemplo y espere a que el proyecto Maven descargue los paquetes de dependencias.

    Después de configurar los parámetros Maven y SDK en el PC local, el proyecto de ejemplo carga automáticamente paquetes de dependencias relacionados. Para obtener más información, consulte Configuración e importación de proyectos de muestra.

    Figura 1 Proyecto de muestra Spark Hive a HBase

    La clase SparkHivetoHbase del proyecto de ejemplo utiliza Spark para invocar a las API de Hive para operar una tabla Hive, obtener el registro correspondiente de una tabla HBase basada en la clave, realizar operaciones en los dos registros de datos y actualizar los datos a la tabla HBase.

    El fragmento de código es el siguiente:

    ...
    public class SparkHivetoHbase {
        public static void main(String[] args) throws Exception {
            String userPrincipal = "sparkuser";     //Specifies the cluster user information and keytab file address used for authentication.
            String userKeytabPath = "/opt/client/user.keytab";
            String krb5ConfPath = "/opt/client/krb5.conf";
            Configuration hadoopConf = new Configuration();
            LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
            //Calls the Spark API to obtain table data.
            SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
            Dataset<Row> dataFrame = sqlContext.sql("select name, account from person");
            //Traverses partitions in the Hive table and updates the partitions to the HBase table.
                    dataFrame
                    .toJavaRDD()
                    .foreachPartition(
                            new VoidFunction<Iterator<Row>>() {
                                public void call(Iterator<Row> iterator) throws Exception {
                                    hBaseWriter(iterator);
                                }
                            });
            jsc.stop();
        }
        //Updates records in the HBase table on the executor.
        private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
            //Reads the HBase table.
            String tableName = "table2";
            String columnFamily = "cf";
            Configuration conf = HBaseConfiguration.create();
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            try {
                connection = ConnectionFactory.createConnection(conf);
                table = connection.getTable(TableName.valueOf(tableName));
                List<Row> table1List = new ArrayList<Row>();
                List<Get> rowList = new ArrayList<Get>();
                while (iterator.hasNext()) {
                    Row item = iterator.next();
                    Get get = new Get(item.getString(0).getBytes());
                    table1List.add(item);
                    rowList.add(get);
                }
                //Obtains the records in the HBase table.
                Result[] resultDataBuffer = table.get(rowList);
                //Modifies records in the HBase table.
                List<Put> putList = new ArrayList<Put>();
                for (int i = 0; i < resultDataBuffer.length; i++) {
                    Result resultData = resultDataBuffer[i];
                    if (!resultData.isEmpty()) {
                        int hiveValue = table1List.get(i).getInt(1);
                        String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
                        Put put = new Put(table1List.get(i).getString(0).getBytes());
                        //Calculates the result.
                        int resultValue = hiveValue + Integer.valueOf(hbaseValue);
                        put.addColumn(
                                Bytes.toBytes(columnFamily),
                                Bytes.toBytes("cid"),
                                Bytes.toBytes(String.valueOf(resultValue)));
                        putList.add(put);
                    }
                }
                if (putList.size() > 0) {
                    table.put(putList);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (table != null) {
                    try {
                        table.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        //Closes the HBase connection.
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    ...
    NOTA:

    Para un clúster MRS con autenticación Kerberos activada, la aplicación debe realizar la autenticación de usuario en el servidor. En este proyecto de ejemplo, configure la información de autenticación en código. Establezca userPrincipal en el nombre de usuario para la autenticación y cambie userKeytabPath y krb5ConfPath a las rutas de archivos reales en el servidor de cliente.

  3. Después de confirmar que los parámetros en el proyecto son correctos, compilar el proyecto y empaquetarlo en un archivo JAR.

    En la ventana Maven, seleccione clean en Lifecycle para ejecutar el proceso de construcción de Maven. Seleccione package y obtenga el paquete JAR del directorio target.

    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  02:36 min
    [INFO] Finished at: 2023-06-12T20:46:24+08:00
    [INFO] ------------------------------------------------------------------------

    Por ejemplo, el archivo JAR es SparkHivetoHbase-1.0.jar.

Carga del paquete JAR y preparación de datos de origen

  1. Cargue el paquete JAR a un directorio, por ejemplo, /opt/client/sparkdemo en el nodo cliente.

    NOTA:

    Si no puede acceder directamente al nodo cliente para cargar archivos a través de la red local, cargue el paquete JAR o los datos de origen a OBS, impórtelos a HDFS en la pestaña Files de la consola MRS. Y ejecute el comando hdfs dfs -get en el cliente HDFS para descargarlo al nodo cliente.

  2. Cargue el archivo keytab usado para la autenticación a la ubicación especificada en el código, por ejemplo, /opt/client.
  3. Inicie sesión en el nodo donde está instalado el cliente de clúster como usuario root.

    cd /opt/client

    source bigdata_env

    kinit sparkuser

  4. Cree una tabla Hive e inserte datos en la tabla.

    beeline

    En Hive Beeline, ejecute los siguientes comandos para crear una tabla e insertar datos:

    create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE;

    insert into table person(name,account) values("1","100");

    select * from person;

    +--------------+-----------------+
    | person.name  | person.account  |
    +--------------+-----------------+
    | 1            | 100             |
    +--------------+-----------------+

  5. Cree una tabla HBase e inserte datos en la tabla.

    Salga de Hive Beeline, ejecute el comando spark-beeline y ejecute el siguiente comando para crear una tabla HBase:

    create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid" );

    Salga de Spark Beeline, ejecute el comando hbase shell para ir al HBase Shell y ejecute los siguientes comandos para insertar datos:

    put 'table2', '1', 'cf:cid', '1000'

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:12:50.711, value=1000                                                                                           
    1 row(s)

Ejecución de la aplicación y visualización del resultado

  1. En el nodo donde está instalado el cliente de clúster, ejecute los siguientes comandos para ejecutar el paquete JAR exportado desde el proyecto de ejemplo:

    cd /opt/client

    source bigdata_env

    cd Spark2x/spark

    vi conf/spark-defaults.conf

    Cambie el valor de spark.yarn.security.credentials.hbase.enabled a true.

    bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/client/sparkdemo/SparkHivetoHbase-1.0.jar

  2. Una vez enviada la tarea, inicie sesión en FusionInsight Manager como usuario sparkuser, elija Cluster > Services > Yarn y vincule a la interfaz de usuario web ResourceManager. A continuación, localice la información del trabajo de la aplicación de Spark y haga clic en ApplicationMaster en la última columna de la información de la aplicación para ir a la interfaz de usuario de Spark y ver detalles.

    Figura 2 Consulta de los detalles de la tarea Spark

  3. Una vez completada la tarea, consulte el contenido de la tabla HBase en el HBase shell. Puede ver que los registros han sido actualizados.

    cd /opt/client

    source bigdata_env

    hbase shell

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:22:50.711, value=1100                                                                                           
    1 row(s)

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Comentarios

Comentarios

Comentarios

0/500

Seleccionar contenido

Enviar el contenido seleccionado con los comentarios