Esta página ainda não está disponível no idioma selecionado. Estamos trabalhando para adicionar mais opções de idiomas. Agradecemos sua compreensão.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
Software Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Updated on 2024-01-26 GMT+08:00

Configuring a Kafka Client in Java

Scenarios

This section describes how to connect to a Java-based Kafka client and how to produce and consume messages.

Prerequisites

  • You have obtained MQS connection information. For details, see Preparations.
  • You have installed the development tool and Java development environment. For details, see Preparations.

Installing the Kafka Client

MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Java open-source client, see suggested client versions.

Select the client version based on the Kafka version of the instance. The following uses the 2.7.2 version as an example.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.7.2</version>
</dependency>

Modifying Configuration Information

The following describes example producer and consumer configuration files. If SASL authentication is enabled for a ROMA Connect instance, you must configure SASL authentication information in the configuration file of the Java client. Otherwise, the connection fails. If SASL authentication is not enabled, comment out the related configuration.

  • Producer configuration file (corresponding to the mqs.sdk.producer.properties file in the production message code)

    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required.

    #The topic name is in the specific production and consumption code.
    #######################
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Send acknowledgment parameters.
    acks=all
    #Sequence mode of the key.
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Sequence mode of the value.
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Total bytes of memory the producer can use to buffer records waiting to be sent to the server. 
    buffer.memory=33554432
    #Number of retries.
    retries=0
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the username and password.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file. The value is fixed and cannot be changed. This password is used to access the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap.servers: MQS connection addresses and ports
    • username and password: username and password used for SASL_SSL authentication
    • ssl.truststore.location: client certificate used for SASL_SSL authentication
  • Consumer configuration file (corresponding to the mqs.sdk.consumer.properties file in the consumption message code)

    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required.

    #The topic name is in the specific production and consumption code.
    #######################
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #A character string that uniquely identifies the group to which the consumer process belongs. You can set it as required.
    #If group id is set to the same value, the processes belong to the same consumer group.
    group.id=1
    #Sequence mode of the key.
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Sequence mode of the value.
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Offset mode.
    auto.offset.reset=earliest
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the jaas username and password on the console.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap.servers: MQS connection addresses and ports
    • group.id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
    • username and password: username and password used for SASL_SSL authentication
    • ssl.truststore.location: client certificate used for SASL_SSL authentication

Producing Messages

  • Test code:
    package com.mqs.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class MqsProducerTest {
        @Test
        public void testProducer() throws Exception {
            MqsProducer<String, String> producer = new MqsProducer<String, String>();
            int partiton = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following production message code.
                    producer.produce("topicName", partiton, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                //TODO: troubleshooting
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
  • Production message code:
    package com.mqs.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class MqsProducer<K, V> {
        //Introduce configuration information about production messages. For details, see the preceding description.
        public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        MqsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        MqsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url:properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }

Consuming Messages

  • Test code:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class MqsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            MqsConsumer consumer = new MqsConsumer();
            //Enter the name of the topic that consumes messages.
            consumer.consume(Arrays.asList("topicName"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                //TODO: troubleshooting
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
  • Consumption message code:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class MqsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        MqsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        MqsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }

Usamos cookies para aprimorar nosso site e sua experiência. Ao continuar a navegar em nosso site, você aceita nossa política de cookies. Saiba mais