Desarrollo de aplicaciones de Flink
Flink es un marco de computación unificado que soporta tanto el procesamiento por lotes como el procesamiento de flujo. Proporciona un motor de procesamiento de datos de flujo que admite la distribución de datos y la computación en paralelo. Flink cuenta con procesamiento de flujo y es un motor de procesamiento de flujo de código abierto superior en la industria.
Flink proporciona procesamiento de datos de canalización de alta concurrencia, latencia de nivel de milisegundos y alta confiabilidad, lo que lo hace adecuado para el procesamiento de datos de baja latencia.
El sistema Flink consta de las siguientes partes:
- Client
Flink client se utiliza para enviar trabajos de streaming a Flink.
- TaskManager
TaskManager es un nodo de ejecución de servicio de Flink, que ejecuta tareas específicas. Puede haber muchos TaskManagers y son equivalentes entre sí.
- JobManager
JobManager es un nodo de gestión de Flink. Gestiona todas las TaskManagers y programa las tareas enviadas por los usuarios a TaskManagers específicos. En el modo de alta disponibilidad (HA), se despliega múltiples JobManagers. Entre estos JobManagers se selecciona uno como el JobManager activo, y los otros están en espera.
MRS proporciona ejemplos de proyectos de desarrollo de aplicaciones basados en múltiples componentes de Flink. Esta práctica proporciona orientación para que obtenga e importe un proyecto de muestra después de crear un clúster MRS y, a continuación, realice la construcción y puesta en marcha localmente. En este proyecto de ejemplo, puede implementar Flink DataStream para procesar datos.
Creación de un clúster MRS Flink
- Cree y compre un clúster MRS que contenga Hive. Para obtener más información, consulte Compra de un clúster personalizado.
En esta práctica, se utiliza como ejemplo un clúster MRS 3.2.0-LTS.1, con Hadoop y Flink instalados y con la autenticación Kerberos habilitada.
- Haga clic en Buy Now y espere hasta que se cree el clúster MRS.
Preparación del archivo de configuración de clúster
- Una vez creado el clúster, inicie sesión en FusionInsight Manager y cree un usuario del clúster para enviar trabajos de Flink.
Elija System > Permission > User. En la página mostrada, haga clic en Create. En la página mostrada, cree un usuario máquina-máquina, por ejemplo, flinkuser.
Agregue el grupo de usuarios supergroup y asocie el rol System_administrator.
- Elija System > Permission > User. En la columna Operation de flinkuser, elija More > Download Authentication Credential. Guarde el archivo y descomprímalo para obtener los archivos user.keytab y krb5.conf del usuario.
- Elija Cluster. En la pestaña Dashboard, haga clic en More y seleccione Download Client. En el cuadro de diálogo que se muestra, establezca Select Client Type en Configuration Files Only y haga clic en OK. Después de generar el paquete cliente, descargue el paquete como se le indique y descomprima.
Por ejemplo, si el paquete del archivo de configuración del cliente es FusionInsight_Cluster_1_Services_Client.tar, descomprima para obtener FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles.tar. A continuación, continúe para descomprimir este archivo.
Vaya al directorio FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles\Flink\config y obtenga los archivos de configuración.
Obtención del proyecto de muestra
- Obtenga el proyecto de muestra de Huawei Mirrors.
Descargue el código fuente y los archivos de configuración del proyecto de ejemplo, y configure las herramientas de desarrollo relacionadas en su PC local. Para obtener más información, consulte Obtención de proyectos de muestra desde Huawei Mirros.
Seleccione una rama basada en la versión del clúster y descargue el proyecto de muestra de MRS requerido.
Por ejemplo, el proyecto de muestra adecuado para esta práctica es FlinkStreamJavaExample, que se puede obtener en https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1/src/flink-examples/flink-examples-security/FlinkStreamJavaExample.
- Utilice IDEA para importar el proyecto de ejemplo y espere a que el proyecto Maven descargue los paquetes de dependencias. Para obtener más información, consulte Configuración e importación de proyectos de muestra.
Figura 1 Proyecto de muestra de Flink
Después de configurar los parámetros Maven y SDK en el PC local, el proyecto de ejemplo carga automáticamente paquetes de dependencias relacionados.
- Utilice Flink client para enviar el programa DataStream desarrollado para que no se requiera autenticación de seguridad en el código.
Supongamos que hay un archivo de registro de tiempo en el sitio durante los fines de semana de un sitio web de compras en línea. Escriba el programa DataStream para recopilar estadísticas en tiempo real sobre información detallada sobre las usuarias femeninas que pasan más de 2 horas en compras en línea.
La primera columna del archivo de registro registra los nombres, la segunda columna registra el sexo y la tercera columna registra el tiempo en el sitio (en minutos). Tres columnas están separadas por comas (,).- log1.txt: registros recogidos el sábado
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
log2.txt: registros recogidos el domingo
LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
El procedimiento de desarrollo es el siguiente:
- Lea los datos de texto, genere DataStreams y analice los datos para generar UserRecord.
- Busque los datos de destino (tiempo en el sitio de usuarios femeninos).
- Realice la operación keyby basada en nombres y géneros, y calcule el tiempo total que cada usuario femenino pasa en línea dentro de una ventana de tiempo.
- Busque usuarios cuya duración consecutiva en línea exceda el umbral.
public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { // Print the command reference for flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // Read text pathes and separate them with commas (,). If the source file is in the HDFS, set this parameter to a specific HDFS path, for example, hdfs://hacluster/tmp/log1.txt,hdfs://hacluster/tmp/log2.txt. final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // Set the time window. The default value is 2 minutes per time window. One time window is sufficient to read all data in the text. final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // Construct an execution environment and use eventTime to process the data obtained in a time window. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // Read the text data stream. DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // Convert the data, construct data processing logic, and calculate and print the results. unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // Call execute to trigger the execution. env.execute("FemaleInfoCollectionPrint java"); } // Construct a keyBy keyword for grouping. private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // Resolve text line data and construct the UserRecord data structure. private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // Define the UserRecord data structure and override the toString printing method. public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // Construct a class inherited from AssignerWithPunctuatedWatermarks to set eventTime and waterMark. private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }
- log1.txt: registros recogidos el sábado
Creación y ejecución de la aplicación
- En IntelliJ IDEA, configure la información de Artifacts del proyecto.
- En la página de inicio de IDEA, seleccione .
- En la página Project Structure, seleccione Artifacts, haga clic en + y elija JAR > Empty.
Figura 2 Adición de Artifacts
- Establezca el nombre, el tipo y la ruta de salida del paquete JAR, por ejemplo, flink-demo.
Figura 3 Configuración de información básica
- Haga clic con el botón derecho en 'FlinkStreamJavaExample' compile output y elija Put into Output Root en el menú contextual. A continuación, haga clic en Apply.
Figura 4 Put into Output Root
- Haga clic en OK.
- Generar un archivo JAR.
- En la página de inicio de IDEA, seleccione .
- En el menú que se muestra, elija Figura 5 Build
para generar el archivo JAR.
- Obtenga el archivo flink-demo.jar de la ruta de acceso configurada en 1.c.
- Instalar y configurar el Flink client.
- Instale el cliente de clúster MRS, por ejemplo, en /opt/hadoopclient.
- Descomprima el paquete de credenciales de autenticación descargado de Preparación del archivo de configuración de clúster y copie el archivo obtenido en un directorio en el nodo cliente, por ejemplo, /opt/hadoopclient/Flink/flink/conf.
- Ejecute el siguiente comando para establecer los parámetros de configuración del cliente de Flink y guardar la configuración:
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
Agregue la dirección IP del servicio del nodo cliente y la dirección IP flotante del FusionInsight Manager al elemento de configuración jobmanager.web.allow-access-address y agregue la ruta keytab y el nombre de usuario a los elementos de configuración correspondientes.
... jobmanager.web.allow-access-address: 192.168.64.122,192.168.64.216,192.168.64.234 ... security.kerberos.login.keytab: /opt/client/Flink/flink/conf/user.keytab security.kerberos.login.principal: flinkuser ...
- Configurar la autenticación de seguridad.
- Ejecute los siguientes comandos para generar un archivo de autenticación de seguridad de Flink client:
cd /opt/hadoopclient/Flink/flink/bin
sh generate_keystore.sh
Introduzca una contraseña definida por el usuario para la autenticación.
- Configure las rutas para que el cliente acceda a los archivos flink.keystore y flink.truststore.
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
mv flink.keystore ssl/
mv flink.truststore ssl/
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
Cambie las rutas de los dos parámetros siguientes a rutas relativas:
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- Ejecute los siguientes comandos para generar un archivo de autenticación de seguridad de Flink client:
- Cargue el paquete JAR generado en 2 al directorio relacionado en el nodo de Flink client, por ejemplo, /opt/hadoopclient.
Cree el directorio conf en el directorio donde se encuentra el paquete JAR y cargue los archivos de configuración en Flink/config del paquete de archivos de configuración del cliente de clúster obtenido en Preparación del archivo de configuración de clúster al directorio conf.
- Cargue los archivos de datos de origen de aplicación al nodo donde se despliega la instancia NodeManager.
En este ejemplo, los archivos de datos de origen log1.txt y log2.txt se almacenan en el host local. Debe cargar los archivos en el directorio /opt en todos los nodos de Yarn NodeManager y establecer el permiso de archivo en 755.
- En el Flink client, ejecute el comando yarn session para iniciar el clúster de Flink.
cd /opt/hadoopclient/Flink/flink
bin/yarn-session.sh -jm 1024 -tm 1024 -t conf/ssl/
... Cluster started: Yarn cluster with application id application_1683438782910_0009 JobManager Web Interface: http://192.168.64.10:32261
- Abra una nueva ventana de conexión de cliente, vaya al directorio de Flink client y ejecute el programa.
source /opt/hadoopclient/bigdata_env
cd /opt/hadoopclient/Flink/flink
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/hadoopclient/flink-demo.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
... 2023-05-26 19:56:52,068 | INFO | [main] | Found Web Interface host-192-168-64-10:32261 of application 'application_1683438782910_0009'. | org.apache.flink.yarn.YarnClusterDescriptor.setClusterEntrypointInfoToConfig(YarnClusterDescriptor.java:1854) Job has been submitted with JobID 7647255752b09456d5a580e33a8529f5 Program execution finished Job with JobID 7647255752b09456d5a580e33a8529f5 has finished. Job Runtime: 36652 ms
- Comprobar los resultados de la ejecución.
Inicie sesión en FusionInsight Manager como usuario flinkuser y elija Cluster > Service > Yarn. En la página Applications, haga clic en un nombre de trabajo para ir a la página de detalles del trabajo.
Figura 6 Consulta de detalles de trabajo de Yarn
Para el trabajo enviado en una session, puede hacer clic en Tracking URL para iniciar sesión en la página nativa del servicio de Flink para ver la información del trabajo.
Figura 7 Ver detalles del trabajo de Flink
En este proyecto de ejemplo, haga clic en Task Managers y vea el resultado en ejecución en la pestaña Stdout del trabajo.
Figura 8 Ver los resultados en ejecución de la aplicación