Updated on 2023-02-28 GMT+08:00

Configuring Kafka Clients in Java

This section describes how to use the Maven to introduce the Kafka client for MQS, connect clients, and produce and consume messages. For details about how to view the demo details, see Setting Up the Java Development Environment.

All configuration information, such as the MQS connection address, topic name, and user information, mentioned in the following parts can be obtained in Collecting Connection Information.

Adding Kafka Clients in Maven

MQS is based on Kafka 1.1.0 and 2.3.0. Use the Kafka client of the same version. You can view the Kafka version information in the MQS Information area on the Instance Information page of the ROMA Connect console.

If the client of the 1.1.0 version is used, set the version parameter to 1.1.0. If the client of the 2.3.0 version is used, set the version parameter to 2.3.0. The following uses the client of the 2.3.0 version as an example.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.3.0</version>
</dependency>

Preparing Configuration Information

The following describes example producer and consumer configuration files. If SASL authentication is enabled for a ROMA Connect instance, you must configure SASL authentication information in the configuration file of the Java client. Otherwise, the connection fails. If SASL authentication is not enabled, comment out the related configuration.

  • Producer configuration file (corresponding to the mqs.sdk.producer.properties file in the production message code)
    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required. All configuration information, such as the MQS connection address, topic name, and user information, can be obtained in Collecting Connection Information.
    #The topic name is in the specific production and consumption code.
    #######################
    #You can obtain the broker information from the console.
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Send acknowledgment parameters.
    acks=all
    #Sequence mode of the key.
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Sequence mode of the value.
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Total bytes of memory the producer can use to buffer records waiting to be sent to the server. 
    buffer.memory=33554432
    #Number of retries.
    retries=0
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the jaas username and password on the console.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file. The value is fixed and cannot be changed.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=
  • Consumer configuration file (corresponding to the mqs.sdk.consumer.properties file in the consumption message code)
    The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required. All configuration information, such as the MQS connection address, topic name, and user information, can be obtained in Collecting Connection Information.
    #The topic name is in the specific production and consumption code.
    #######################
    #You can obtain the broker information from the console.
    #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094.
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #A character string that uniquely identifies the group to which the consumer process belongs. You can set it as required.
    #If group id is set to the same value, the processes belong to the same consumer group.
    group.id=1
    #Sequence mode of the key.
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Sequence mode of the value.
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Offset mode.
    auto.offset.reset=earliest
    #######################
    #If SASL authentication is not used, comment out the following parameters:
    #######################
    #Set the jaas username and password on the console.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL authentication mode.
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, the SASL_SSL protocol is supported.
    security.protocol=SASL_SSL
    #Location of the SSL truststore file.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file.
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

Producing Messages

  • Test code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.mqs.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class MqsProducerTest {
        @Test
        public void testProducer() throws Exception {
            MqsProducer<String, String> producer = new MqsProducer<String, String>();
            int partiton = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following production message code.
                    producer.produce("topic-0", partiton, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                //TODO: troubleshooting
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
    
  • Production message code:
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    package com.mqs.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class MqsProducer<K, V> {
        //Introduce configuration information about production messages. For details, see the preceding description.
        public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        MqsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        MqsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * Production messages
         *
         * @param topic        topic object
         * @param partition    partition
         * @param key          message key
         * @param data         message data
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url:properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }
    

Consuming Messages

  • Test code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class MqsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            MqsConsumer consumer = new MqsConsumer();
            consumer.consume(Arrays.asList("topic-0"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                //TODO: troubleshooting
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
    
  • Consumption message code:
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class MqsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        MqsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        MqsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         *Load configuration information from classpath.
         *
         * @param configFileName configuration file name
         * @return configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }