Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Actualización más reciente 2023-11-20 GMT+08:00

Desarrollo de aplicaciones de Kafka

Kafka es un sistema de publicación y suscripción de mensajes distribuidos. Con características similares a JMS, Kafka procesa datos de streaming activos.

Kafka se aplica a muchos escenarios, como la cola de mensajes, el seguimiento de comportamiento, la supervisión de datos de O&M, la recopilación de registros, el procesamiento de secuencias, el seguimiento de eventos y la persistencia de registros.

Kafka tiene las siguientes características:

  • Alto rendimiento
  • Persistencia de mensajes a los discos
  • Sistema distribuido escalable
  • Alta tolerancia a fallos
  • Soporte para escenarios en línea y fuera de línea

MRS proporciona ejemplos de proyectos de desarrollo de aplicaciones basados en Kafka. Esta práctica proporciona orientación para que obtenga e importe un proyecto de muestra después de crear un clúster MRS y, a continuación, realice la construcción y puesta en marcha localmente. En este proyecto de ejemplo, puede implementar el procesamiento de datos de streaming.

Las directrices para el proyecto de muestra en esta práctica son las siguientes:

  1. Cree dos topics en el cliente Kafka para que sirvan como topic de entrada y salida.
  2. Desarrolle Kafka Streams para contar palabras en cada mensaje leyendo mensajes en el topic de entrada y para generar el resultado en pares clave-valor consumiendo datos en el topic de salida.

Creación de un clúster de MRS

  1. Cree y compre un clúster MRS que contenga Kafka. Para obtener más información, consulte Compra de un clúster personalizado.

    En esta práctica, se utiliza como ejemplo un clúster MRS 3.1.0, con Hadoop y Kafka instalados y con la autenticación Kerberos deshabilitada.

  2. Después de comprar el clúster, instale el cliente en cualquier nodo del clúster. Para obtener más información, consulte Instalación y uso de cliente de clúster.

    Por ejemplo, instale el cliente en el directorio /opt/client en el nodo de gestión activo.

  3. Después de instalar el cliente, cree el directorio lib en el cliente para almacenar los paquetes JAR relacionados.

    Copie a lib los paquetes JAR de Kafka en el directorio descomprimido durante la instalación del cliente.

    Por ejemplo, si la ruta de descarga del paquete de software cliente es /tmp/FusionInsight-Client en el nodo de gestión activa, ejecute los siguientes comandos:

    mkdir /opt/client/lib

    cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig

    scp Kafka/install_files/kafka/libs/* /opt/client/lib

Desarrollo de la aplicación

  1. Obtenga el proyecto de muestra de Huawei Mirrors.

    Descargue el código fuente del proyecto Maven y los archivos de configuración del proyecto de ejemplo, y configure las herramientas de desarrollo relacionadas en su PC local. Para obtener más información, consulte Obtención de proyectos de muestra desde Huawei Mirros.

    Seleccione una rama basada en la versión del clúster y descargue el proyecto de muestra de MRS requerido.

    Por ejemplo, el proyecto de muestra adecuado para esta práctica es WordCountDemo, que se puede obtener en https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/kafka-examples.

  2. Utilice IDEA para importar el proyecto de ejemplo y espere a que el proyecto Maven descargue los paquetes de dependencias.

    Después de configurar los parámetros Maven y SDK en el PC local, el proyecto de ejemplo carga automáticamente paquetes de dependencias relacionados. Para obtener más información, consulte Configuración e importación de proyectos de muestra.

    El proyecto de ejemplo WordCountDemo invoca a las API de Kafka para obtener y ordenar registros de palabras y luego obtener los registros de cada palabra. El fragmento de código es el siguiente:

    ...
        static Properties getStreamsConfig() {
            final Properties props = new Properties();
            KafkaProperties kafkaProc = KafkaProperties.getInstance();
            //Set broker addresses based on site requirements.
            props.put(BOOTSTRAP_SERVERS, kafkaProc.getValues(BOOTSTRAP_SERVERS, "node-group-1kLFk.mrs-rbmq.com:9092"));
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
            props.put(APPLICATION_ID, kafkaProc.getValues(APPLICATION_ID, "streams-wordcount"));
            //Set the protocol type, which can be SASL_PLAINTEXT or PLAINTEXT.
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "PLAINTEXT"));
            props.put(CACHE_MAX_BYTES_BUFFERING, 0);
            props.put(DEFAULT_KEY_SERDE, Serdes.String().getClass().getName());
            props.put(DEFAULT_VALUE_SERDE, Serdes.String().getClass().getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return props;
        }
        static void createWordCountStream(final StreamsBuilder builder) {
            //Receives input records from the input topic.
            final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);
            //Aggregates the calculation results of the key-value pairs.
            final KTable<String, Long> counts = source
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
                    .groupBy((key, value) -> value)
                    .count();
            //Outputs the key-value pairs from the output topic.
            counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
        }
    ...
    • Establezca BOOTSTRAP_SERVERS en el nombre del host y el número de puerto del nodo del broker Kafka según los requisitos del sitio. Puede elegir Cluster > Services > Kafka > Instance en FusionInsight Manager para ver la información de la instancia del broker.
    • Establezca SECURITY_PROTOCOL en el protocolo para conectarse a Kafka. En este ejemplo, establezca este parámetro en PLAINTEXT.

  3. Después de confirmar que los parámetros de WordCountDemo.java son correctos, construya el proyecto y empaquetelo en un archivo JAR.

    Para obtener más información sobre cómo crear un archivo JAR, consulte Puesta en marcha de una aplicación de Linux.

    Por ejemplo, el archivo JAR es kafka-demo.jar.

Cargar el paquete JAR y los datos de origen

  1. Cargue el paquete JAR a un directorio, por ejemplo, /opt/client/lib en el nodo cliente.

    Si no puede acceder directamente al nodo cliente para cargar archivos a través de la red local, cargue el paquete JAR o los datos de origen a OBS, impórtelos a HDFS en la pestaña Files de la consola MRS. Y ejecute el comando hdfs dfs -get en el cliente HDFS para descargarlo al nodo cliente.

Ejecución de un trabajo y visualización del resultado

  1. Inicie sesión en el nodo donde está instalado el cliente de clúster como usuario root.

    cd /opt/client

    source bigdata_env

  2. Cree un topic de entrada y un topic de salida. Asegúrese de que los nombres de los topics son los mismos que los especificados en el código de ejemplo. Establezca la política de limpieza del topic de salida en compact.

    kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port/kafka --replication-factor 1 --partitions 1 --topic Topic name

    Para consultar la dirección IP de la instancia de quorumpeer, inicie sesión en el FusionInsight Manager del clúster, elija Cluster > Services > ZooKeeper y haga clic en la pestaña Instance. Utilice comas (,) para separar varias direcciones IP. Puede obtener el puerto de conexión del cliente ZooKeeper consultando el parámetro de configuración ZooKeeper clientPort. El valor predeterminado es 2181.

    Por ejemplo, ejecute los siguientes comandos:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

  3. Después de crear los topics, ejecute el siguiente comando para ejecutar la aplicación:

    java -cp .:/opt/client/lib/* com.huawei.bigdata.kafka.example.WordCountDemo

  4. Abra una nueva ventana de cliente y ejecute los siguientes comandos para usar kafka-console-producer.sh para escribir mensajes en el topic de entrada:

    cd /opt/client

    source bigdata_env

    kafka-console-producer.sh --broker-list IP address of the broker instance:Kafka connection port(for example, 192.168.0.13:9092) --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

  5. Abra una nueva ventana de cliente y ejecute los siguientes comandos para usar kafka-console-consumer.sh para consumir datos del tema de salida y ver el resultado:

    cd /opt/client

    source bigdata_env

    kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server IP address of the broker instance:Kafka connection port --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter

    Escriba un mensaje al topic de entrada.

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    La información que se muestra es la siguiente:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1