Desarrollo de aplicaciones de Spark
- Procesamiento de datos: Spark puede procesar datos rápidamente y tiene tolerancia a fallos y escalabilidad.
- Cálculo iterativo: Spark admite el cálculo iterativo para mantenerse al día con la lógica de procesamiento de datos de varios pasos.
- Minería de datos: Basada en datos masivos, Spark puede manejar la minería y el análisis de datos complejos y admite múltiples algoritmos de minería de datos y aprendizaje automático.
- Procesamiento de streaming: Spark admite el procesamiento de streaming con solo una latencia de nivel de segundos y admite múltiples fuentes de datos externas.
- Análisis de consultas: Spark admite el análisis de consultas SQL estándar, proporciona el DSL (DataFrame) y admite múltiples entradas externas.
MRS proporciona ejemplos de proyectos de desarrollo de aplicaciones basados en Spark. Esta práctica proporciona orientación para que obtenga e importe un proyecto de muestra después de crear un clúster MRS y, a continuación, realice la construcción y puesta en marcha localmente. En este proyecto de ejemplo, puede leer datos de tablas Hive y volver a escribir los datos en tablas HBase.
Las directrices para el proyecto de muestra en esta práctica son las siguientes:
- Consultar datos en una tabla Hive especificada.
- Consultar datos en una tabla HBase especificada basándose en la clave de los datos en la tabla Hive.
- Agregar registros de datos relacionados y escribirlos en la tabla HBase.
Creación de un clúster de MRS
- Cree y compre un clúster MRS que contenga Spark. Para obtener más información, consulte Compra de un clúster personalizado.
En esta práctica, se utiliza como ejemplo un clúster MRS 3.1.5, con Spark2x, Hive y HBase instalados y con la autenticación Kerberos habilitada.
- Después de comprar el clúster, instale el cliente en cualquier nodo del clúster. Para obtener más información, consulte Instalación y uso de cliente de clúster.
Por ejemplo, instale el cliente en el directorio /opt/client en el nodo de gestión activo.
Preparación del archivo de configuración de clúster
- Una vez creado el clúster, inicie sesión en FusionInsight Manager y cree un usuario del clúster para enviar trabajos de Flink.
Elija System > Permission > User. En el panel derecho, haga clic en Create. En la página mostrada, cree un usuario hombre-máquina, por ejemplo, sparkuser.
Agregue el grupo de usuarios supergroup y asocie el rol System_administrator.
- Inicie sesión en FusionInsight Manager como nuevo usuario y cambie la contraseña inicial según se le solicite.
- Elija System > Permission > User. En la columna Operation de sparkuser, elija More > Download Authentication Credential. Guarde el archivo y descomprímalo para obtener los archivos user.keytab y krb5.conf del usuario.
Desarrollo de la aplicación
- Obtenga el proyecto de muestra de Huawei Mirrors.
Descargue el código fuente del proyecto Maven y los archivos de configuración del proyecto de ejemplo, y configure las herramientas de desarrollo relacionadas en su PC local. Para obtener más información, consulte Obtención de proyectos de muestra desde Huawei Mirros.
Seleccione una rama basada en la versión del clúster y descargue el proyecto de muestra de MRS requerido.
Por ejemplo, el proyecto de muestra adecuado para esta práctica es SparkHivetoHbase, que se puede obtener en https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5/src/spark-examples/sparksecurity-examples/SparkHivetoHbaseJavaExample.
- Utilice IDEA para importar el proyecto de ejemplo y espere a que el proyecto Maven descargue los paquetes de dependencias.
Después de configurar los parámetros Maven y SDK en el PC local, el proyecto de ejemplo carga automáticamente paquetes de dependencias relacionados. Para obtener más información, consulte Configuración e importación de proyectos de muestra.
Figura 1 Proyecto de muestra Spark Hive a HBase
La clase SparkHivetoHbase del proyecto de ejemplo utiliza Spark para invocar a las API de Hive para operar una tabla Hive, obtener el registro correspondiente de una tabla HBase basada en la clave, realizar operaciones en los dos registros de datos y actualizar los datos a la tabla HBase.
El fragmento de código es el siguiente:
... public class SparkHivetoHbase { public static void main(String[] args) throws Exception { String userPrincipal = "sparkuser"; //Specifies the cluster user information and keytab file address used for authentication. String userKeytabPath = "/opt/client/user.keytab"; String krb5ConfPath = "/opt/client/krb5.conf"; Configuration hadoopConf = new Configuration(); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); //Calls the Spark API to obtain table data. SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc); Dataset<Row> dataFrame = sqlContext.sql("select name, account from person"); //Traverses partitions in the Hive table and updates the partitions to the HBase table. dataFrame .toJavaRDD() .foreachPartition( new VoidFunction<Iterator<Row>>() { public void call(Iterator<Row> iterator) throws Exception { hBaseWriter(iterator); } }); jsc.stop(); } //Updates records in the HBase table on the executor. private static void hBaseWriter(Iterator<Row> iterator) throws IOException { //Reads the HBase table. String tableName = "table2"; String columnFamily = "cf"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tableName)); try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Row> table1List = new ArrayList<Row>(); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Row item = iterator.next(); Get get = new Get(item.getString(0).getBytes()); table1List.add(item); rowList.add(get); } //Obtains the records in the HBase table. Result[] resultDataBuffer = table.get(rowList); //Modifies records in the HBase table. List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { int hiveValue = table1List.get(i).getInt(1); String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(table1List.get(i).getString(0).getBytes()); //Calculates the result. int resultValue = hiveValue + Integer.valueOf(hbaseValue); put.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { //Closes the HBase connection. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } ...
Para un clúster MRS con autenticación Kerberos activada, la aplicación debe realizar la autenticación de usuario en el servidor. En este proyecto de ejemplo, configure la información de autenticación en código. Establezca userPrincipal en el nombre de usuario para la autenticación y cambie userKeytabPath y krb5ConfPath a las rutas de archivos reales en el servidor de cliente.
- Después de confirmar que los parámetros en el proyecto son correctos, compilar el proyecto y empaquetarlo en un archivo JAR.
En la ventana Maven, seleccione clean en Lifecycle para ejecutar el proceso de construcción de Maven. Seleccione package y obtenga el paquete JAR del directorio target.
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 02:36 min [INFO] Finished at: 2023-06-12T20:46:24+08:00 [INFO] ------------------------------------------------------------------------
Por ejemplo, el archivo JAR es SparkHivetoHbase-1.0.jar.
Carga del paquete JAR y preparación de datos de origen
- Cargue el paquete JAR a un directorio, por ejemplo, /opt/client/sparkdemo en el nodo cliente.
Si no puede acceder directamente al nodo cliente para cargar archivos a través de la red local, cargue el paquete JAR o los datos de origen a OBS, impórtelos a HDFS en la pestaña Files de la consola MRS. Y ejecute el comando hdfs dfs -get en el cliente HDFS para descargarlo al nodo cliente.
- Cargue el archivo keytab usado para la autenticación a la ubicación especificada en el código, por ejemplo, /opt/client.
- Inicie sesión en el nodo donde está instalado el cliente de clúster como usuario root.
cd /opt/client
source bigdata_env
kinit sparkuser
- Cree una tabla Hive e inserte datos en la tabla.
beeline
En Hive Beeline, ejecute los siguientes comandos para crear una tabla e insertar datos:
create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE;
insert into table person(name,account) values("1","100");
select * from person;
+--------------+-----------------+ | person.name | person.account | +--------------+-----------------+ | 1 | 100 | +--------------+-----------------+
- Cree una tabla HBase e inserte datos en la tabla.
Salga de Hive Beeline, ejecute el comando spark-beeline y ejecute el siguiente comando para crear una tabla HBase:
create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid" );
Salga de Spark Beeline, ejecute el comando hbase shell para ir al HBase Shell y ejecute los siguientes comandos para insertar datos:
put 'table2', '1', 'cf:cid', '1000'
scan 'table2'
ROW COLUMN+CELL 1 column=cf:cid, timestamp=2023-06-12T21:12:50.711, value=1000 1 row(s)
Ejecución de la aplicación y visualización del resultado
- En el nodo donde está instalado el cliente de clúster, ejecute los siguientes comandos para ejecutar el paquete JAR exportado desde el proyecto de ejemplo:
cd /opt/client
source bigdata_env
cd Spark2x/spark
vi conf/spark-defaults.conf
Cambie el valor de spark.yarn.security.credentials.hbase.enabled a true.
bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/client/sparkdemo/SparkHivetoHbase-1.0.jar
- Una vez enviada la tarea, inicie sesión en FusionInsight Manager como usuario sparkuser, elija Cluster > Services > Yarn y vincule a la interfaz de usuario web ResourceManager. A continuación, localice la información del trabajo de la aplicación de Spark y haga clic en ApplicationMaster en la última columna de la información de la aplicación para ir a la interfaz de usuario de Spark y ver detalles.
Figura 2 Consulta de los detalles de la tarea Spark
- Una vez completada la tarea, consulte el contenido de la tabla HBase en el HBase shell. Puede ver que los registros han sido actualizados.
cd /opt/client
source bigdata_env
hbase shell
scan 'table2'
ROW COLUMN+CELL 1 column=cf:cid, timestamp=2023-06-12T21:22:50.711, value=1100 1 row(s)