Updated on 2024-01-26 GMT+08:00

Configuring a Kafka Client in Java

Scenarios

This section describes how to connect to a Java-based Kafka client and how to produce and consume messages.

Prerequisites

  • You have obtained MQS connection information. For details, see Preparations.
  • You have installed the development tool and Java development environment. For details, see Preparations.

Installing the Kafka Client

MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Java open-source client, see suggested client versions.

Select the client version based on the Kafka version of the instance. The following uses the 2.7.2 version as an example.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.7.2</version>
</dependency>

Modifying Configuration Information

The following describes example producer and consumer configuration files. If SASL authentication is enabled for a ROMA Connect instance, you must configure SASL authentication information in the configuration file of the Java client. Otherwise, the connection fails. If SASL authentication is not enabled, comment out the related configuration.

  • Producer configuration file (corresponding to the mqs.sdk.producer.properties file in the production message code)

    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required.

    #The topic name is in the specific production and consumption code.
    #######################
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Send acknowledgment parameters.
    acks=all
    #Sequence mode of the key.
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Sequence mode of the value.
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Total bytes of memory the producer can use to buffer records waiting to be sent to the server. 
    buffer.memory=33554432
    #Number of retries.
    retries=0
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the username and password.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file. The value is fixed and cannot be changed. This password is used to access the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap.servers: MQS connection addresses and ports
    • username and password: username and password used for SASL_SSL authentication
    • ssl.truststore.location: client certificate used for SASL_SSL authentication
  • Consumer configuration file (corresponding to the mqs.sdk.consumer.properties file in the consumption message code)

    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required.

    #The topic name is in the specific production and consumption code.
    #######################
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #A character string that uniquely identifies the group to which the consumer process belongs. You can set it as required.
    #If group id is set to the same value, the processes belong to the same consumer group.
    group.id=1
    #Sequence mode of the key.
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Sequence mode of the value.
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Offset mode.
    auto.offset.reset=earliest
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the jaas username and password on the console.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap.servers: MQS connection addresses and ports
    • group.id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
    • username and password: username and password used for SASL_SSL authentication
    • ssl.truststore.location: client certificate used for SASL_SSL authentication

Producing Messages

  • Test code:
    package com.mqs.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class MqsProducerTest {
        @Test
        public void testProducer() throws Exception {
            MqsProducer<String, String> producer = new MqsProducer<String, String>();
            int partiton = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following production message code.
                    producer.produce("topicName", partiton, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                //TODO: troubleshooting
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
  • Production message code:
    package com.mqs.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class MqsProducer<K, V> {
        //Introduce configuration information about production messages. For details, see the preceding description.
        public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        MqsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        MqsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url:properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }

Consuming Messages

  • Test code:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class MqsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            MqsConsumer consumer = new MqsConsumer();
            //Enter the name of the topic that consumes messages.
            consumer.consume(Arrays.asList("topicName"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                //TODO: troubleshooting
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
  • Consumption message code:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class MqsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        MqsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        MqsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }