Updated on 2024-05-15 GMT+08:00

Configuring Kafka Clients in Java

This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see Setting Up the Java Development Environment.

The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.

Adding Kafka Clients in Maven

//Kafka instances are based on Kafka 1.1.0/2.3.0/2.7. Use the same version of clients.
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0/2.3.0/2.7.2</version>
        </dependency>

Preparing Kafka Configuration Files

The following describes example producer and consumer configuration files. If SASL is not enabled for the Kafka instance, comment out lines regarding SASL. If SASL has been enabled, set SASL configurations for encrypted access.

  • Producer configuration file (the dms.sdk.producer.properties file in the demo project)

    The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.

    #The topic name is in the specific production and consumption code.
    #######################
    #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Producer acknowledgement
    acks=all
    #Method of turning the key into bytes
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Method of turning the value into bytes
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Memory available to the producer for buffering
    buffer.memory=33554432
    #Number of retries
    retries=0
    #######################
    #Comment out the following parameters if SASL access is not enabled.
    #######################
    # Set the SASL authentication mechanism, username, and password.
    # sasl.mechanism indicates the SASL authentication mechanism. username and password indicate the username and password of SASL_SSL. Obtain them by referring to "Collecting Connection Information."
    # If the SASL mechanism is PLAIN, the configuration is as follows:
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    # If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    # Set security.protocol.
    # If the security protocol is SASL_SSL, the configuration is as follows:
    security.protocol=SASL_SSL
    # ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    # ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    # ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
    ssl.endpoint.identification.algorithm=
    # If the security protocol is SASL_PLAINTEXT, the configuration is as follows:
    security.protocol=SASL_PLAINTEXT
  • Consumer configuration file (the dms.sdk.consumer.properties file in the demo project)
    The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.
    #The topic name is in the specific production and consumption code.
    #######################
    #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same group.id for different processes indicates that the processes belong to the same consumer group.
    group.id=1
    #Method of turning the key into bytes
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Method of turning the value into bytes
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Offset reset policy
    auto.offset.reset=earliest
    #######################
    #Comment out the following parameters if SASL access is not enabled.
    #######################
    # Set the SASL authentication mechanism, username, and password.
    # sasl.mechanism indicates the SASL authentication mechanism. username and password indicate the username and password of SASL_SSL. Obtain them by referring to "Collecting Connection Information."
    # If the SASL mechanism is PLAIN, the configuration is as follows:
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    # If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    # Set security.protocol.
    # If the security protocol is SASL_SSL, the configuration is as follows:
    security.protocol=SASL_SSL
    # ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    # ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    # ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
    ssl.endpoint.identification.algorithm=
    # If the security protocol is SASL_PLAINTEXT, the configuration is as follows:
    security.protocol=SASL_PLAINTEXT

Producing Messages

  • Test code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.dms.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class DmsProducerTest {
        @Test
        public void testProducer() throws Exception {
            DmsProducer<String, String> producer = new DmsProducer<String, String>();
            int partition = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code.
                    producer.produce("topic-0", partition, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                // TODO: Exception handling
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
    
  • Message production code
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    package com.dms.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class DmsProducer<K, V> {
         //Add the producer configurations that have been specified earlier.
        public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        DmsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        DmsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * Load configuration information from classpath.
         *
         * @param configFileName Configuration file name
         * @return Configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }
    

Consuming Messages

  • Test code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class DmsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            DmsConsumer consumer = new DmsConsumer();
            consumer.consume(Arrays.asList("topic-0"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                // TODO: Exception handling
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
    
  • Message consumption code
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class DmsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        DmsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        DmsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * Load configuration information from classpath.
         *
         * @param configFileName Configuration file name
         * @return Configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }