Estos contenidos se han traducido de forma automática para su comodidad, pero Huawei Cloud no garantiza la exactitud de estos. Para consultar los contenidos originales, acceda a la versión en inglés.
Cómputo
Elastic Cloud Server
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Redes
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Gestión y gobernanza
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
Cloud Operations Center
Resource Governance Center
Migración
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Análisis
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
IoT
IoT Device Access
Otros
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Seguridad y cumplimiento
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Blockchain
Blockchain Service
Servicios multimedia
Media Processing Center
Video On Demand
Live
SparkRTC
Almacenamiento
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Contenedores
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Bases de datos
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Aplicaciones empresariales
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Data Lake Factory
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Distribución de contenido y cómputo de borde
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Soluciones
SAP Cloud
High Performance Computing
Servicios para desarrolladores
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
Cloud Application Engine
aPaaS MacroVerse
KooPhone
KooDrive

Desarrollo de aplicaciones de Kafka

Actualización más reciente 2023-11-20 GMT+08:00

Kafka es un sistema de publicación y suscripción de mensajes distribuidos. Con características similares a JMS, Kafka procesa datos de streaming activos.

Kafka se aplica a muchos escenarios, como la cola de mensajes, el seguimiento de comportamiento, la supervisión de datos de O&M, la recopilación de registros, el procesamiento de secuencias, el seguimiento de eventos y la persistencia de registros.

Kafka tiene las siguientes características:

  • Alto rendimiento
  • Persistencia de mensajes a los discos
  • Sistema distribuido escalable
  • Alta tolerancia a fallos
  • Soporte para escenarios en línea y fuera de línea

MRS proporciona ejemplos de proyectos de desarrollo de aplicaciones basados en Kafka. Esta práctica proporciona orientación para que obtenga e importe un proyecto de muestra después de crear un clúster MRS y, a continuación, realice la construcción y puesta en marcha localmente. En este proyecto de ejemplo, puede implementar el procesamiento de datos de streaming.

Las directrices para el proyecto de muestra en esta práctica son las siguientes:

  1. Cree dos topics en el cliente Kafka para que sirvan como topic de entrada y salida.
  2. Desarrolle Kafka Streams para contar palabras en cada mensaje leyendo mensajes en el topic de entrada y para generar el resultado en pares clave-valor consumiendo datos en el topic de salida.

Creación de un clúster de MRS

  1. Cree y compre un clúster MRS que contenga Kafka. Para obtener más información, consulte Compra de un clúster personalizado.

    NOTA:

    En esta práctica, se utiliza como ejemplo un clúster MRS 3.1.0, con Hadoop y Kafka instalados y con la autenticación Kerberos deshabilitada.

  2. Después de comprar el clúster, instale el cliente en cualquier nodo del clúster. Para obtener más información, consulte Instalación y uso de cliente de clúster.

    Por ejemplo, instale el cliente en el directorio /opt/client en el nodo de gestión activo.

  3. Después de instalar el cliente, cree el directorio lib en el cliente para almacenar los paquetes JAR relacionados.

    Copie a lib los paquetes JAR de Kafka en el directorio descomprimido durante la instalación del cliente.

    Por ejemplo, si la ruta de descarga del paquete de software cliente es /tmp/FusionInsight-Client en el nodo de gestión activa, ejecute los siguientes comandos:

    mkdir /opt/client/lib

    cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig

    scp Kafka/install_files/kafka/libs/* /opt/client/lib

Desarrollo de la aplicación

  1. Obtenga el proyecto de muestra de Huawei Mirrors.

    Descargue el código fuente del proyecto Maven y los archivos de configuración del proyecto de ejemplo, y configure las herramientas de desarrollo relacionadas en su PC local. Para obtener más información, consulte Obtención de proyectos de muestra desde Huawei Mirros.

    Seleccione una rama basada en la versión del clúster y descargue el proyecto de muestra de MRS requerido.

    Por ejemplo, el proyecto de muestra adecuado para esta práctica es WordCountDemo, que se puede obtener en https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0/src/kafka-examples.

  2. Utilice IDEA para importar el proyecto de ejemplo y espere a que el proyecto Maven descargue los paquetes de dependencias.

    Después de configurar los parámetros Maven y SDK en el PC local, el proyecto de ejemplo carga automáticamente paquetes de dependencias relacionados. Para obtener más información, consulte Configuración e importación de proyectos de muestra.

    El proyecto de ejemplo WordCountDemo invoca a las API de Kafka para obtener y ordenar registros de palabras y luego obtener los registros de cada palabra. El fragmento de código es el siguiente:

    ...
        static Properties getStreamsConfig() {
            final Properties props = new Properties();
            KafkaProperties kafkaProc = KafkaProperties.getInstance();
            //Set broker addresses based on site requirements.
            props.put(BOOTSTRAP_SERVERS, kafkaProc.getValues(BOOTSTRAP_SERVERS, "node-group-1kLFk.mrs-rbmq.com:9092"));
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
            props.put(APPLICATION_ID, kafkaProc.getValues(APPLICATION_ID, "streams-wordcount"));
            //Set the protocol type, which can be SASL_PLAINTEXT or PLAINTEXT.
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "PLAINTEXT"));
            props.put(CACHE_MAX_BYTES_BUFFERING, 0);
            props.put(DEFAULT_KEY_SERDE, Serdes.String().getClass().getName());
            props.put(DEFAULT_VALUE_SERDE, Serdes.String().getClass().getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return props;
        }
        static void createWordCountStream(final StreamsBuilder builder) {
            //Receives input records from the input topic.
            final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);
            //Aggregates the calculation results of the key-value pairs.
            final KTable<String, Long> counts = source
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
                    .groupBy((key, value) -> value)
                    .count();
            //Outputs the key-value pairs from the output topic.
            counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
        }
    ...
    NOTA:
    • Establezca BOOTSTRAP_SERVERS en el nombre del host y el número de puerto del nodo del broker Kafka según los requisitos del sitio. Puede elegir Cluster > Services > Kafka > Instance en FusionInsight Manager para ver la información de la instancia del broker.
    • Establezca SECURITY_PROTOCOL en el protocolo para conectarse a Kafka. En este ejemplo, establezca este parámetro en PLAINTEXT.

  3. Después de confirmar que los parámetros de WordCountDemo.java son correctos, construya el proyecto y empaquetelo en un archivo JAR.

    Para obtener más información sobre cómo crear un archivo JAR, consulte Puesta en marcha de una aplicación de Linux.

    Por ejemplo, el archivo JAR es kafka-demo.jar.

Cargar el paquete JAR y los datos de origen

  1. Cargue el paquete JAR a un directorio, por ejemplo, /opt/client/lib en el nodo cliente.

    NOTA:

    Si no puede acceder directamente al nodo cliente para cargar archivos a través de la red local, cargue el paquete JAR o los datos de origen a OBS, impórtelos a HDFS en la pestaña Files de la consola MRS. Y ejecute el comando hdfs dfs -get en el cliente HDFS para descargarlo al nodo cliente.

Ejecución de un trabajo y visualización del resultado

  1. Inicie sesión en el nodo donde está instalado el cliente de clúster como usuario root.

    cd /opt/client

    source bigdata_env

  2. Cree un topic de entrada y un topic de salida. Asegúrese de que los nombres de los topics son los mismos que los especificados en el código de ejemplo. Establezca la política de limpieza del topic de salida en compact.

    kafka-topics.sh --create --zookeeper IP address of the quorumpeer instance:ZooKeeper client connection port/kafka --replication-factor 1 --partitions 1 --topic Topic name

    Para consultar la dirección IP de la instancia de quorumpeer, inicie sesión en el FusionInsight Manager del clúster, elija Cluster > Services > ZooKeeper y haga clic en la pestaña Instance. Utilice comas (,) para separar varias direcciones IP. Puede obtener el puerto de conexión del cliente ZooKeeper consultando el parámetro de configuración ZooKeeper clientPort. El valor predeterminado es 2181.

    Por ejemplo, ejecute los siguientes comandos:

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

    kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

  3. Después de crear los topics, ejecute el siguiente comando para ejecutar la aplicación:

    java -cp .:/opt/client/lib/* com.huawei.bigdata.kafka.example.WordCountDemo

  4. Abra una nueva ventana de cliente y ejecute los siguientes comandos para usar kafka-console-producer.sh para escribir mensajes en el topic de entrada:

    cd /opt/client

    source bigdata_env

    kafka-console-producer.sh --broker-list IP address of the broker instance:Kafka connection port(for example, 192.168.0.13:9092) --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties

  5. Abra una nueva ventana de cliente y ejecute los siguientes comandos para usar kafka-console-consumer.sh para consumir datos del tema de salida y ver el resultado:

    cd /opt/client

    source bigdata_env

    kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server IP address of the broker instance:Kafka connection port --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter

    Escriba un mensaje al topic de entrada.

    >This is Kafka Streams test 
    >test starting 
    >now Kafka Streams is running 
    >test end 

    La información que se muestra es la siguiente:

    this    1 
    is      1 
    kafka   1 
    streams 1 
    test    1 
    test    2 
    starting 1 
    now     1 
    kafka   2 
    streams 2 
    is      2 
    running 1 
    test    3 
    end     1

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Comentarios

Comentarios

Comentarios

0/500

Seleccionar contenido

Enviar el contenido seleccionado con los comentarios