Configuring Kafka Clients in Java
This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see Setting Up the Java Development Environment.
The Kafka instance connection addresses, topic name, and user information used in the following examples are obtained in Collecting Connection Information.
Adding Kafka Clients in Maven
//Kafka premium instances are based on Kafka 2.3.0. Use the same version of clients. <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
Preparing Kafka Configuration Files
The following describes example producer and consumer configuration files. If SASL is not enabled for the Kafka instance, comment out lines regarding SASL. If SASL has been enabled, set SASL configurations for encrypted access.
- Producer configuration file (the dms.sdk.producer.properties file in the demo project)
The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.
#The topic name is in the code for producing or consuming messages. ####################### #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #Producer acknowledgement acks=all #Method of turning the key into bytes key.serializer=org.apache.kafka.common.serialization.StringSerializer #Method of turning the value into bytes value.serializer=org.apache.kafka.common.serialization.StringSerializer #Memory available to the producer for buffering buffer.memory=33554432 #Number of retries retries=0 ####################### #Comment out the following parameters if SASL access is not enabled. ####################### #Configure the JAAS username and password. username and password are set when you enable Kafka SASL_SSL during instance creation. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL mechanism sasl.mechanism=PLAIN #Encryption protocol. Currently, only SASL_SSL is supported. security.protocol=SASL_SSL #Location of ssl.truststore ssl.truststore.location=E:\\temp\\client.truststore.jks #Password of ssl.truststore ssl.truststore.password=dms@kafka #Whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification. ssl.endpoint.identification.algorithm=
- Consumer configuration file (the dms.sdk.consumer.properties file in the demo project)
The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.
#The topic name is in the code for producing or consuming messages. ####################### #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same group.id for different processes indicates that the processes belong to the same consumer group. group.id=1 #Method of turning the key into bytes key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Method of turning the value into bytes value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Offset reset policy auto.offset.reset=earliest ####################### #Comment out the following parameters if SASL access is not enabled. ####################### #Configure the JAAS username and password. username and password are set when you enable Kafka SASL_SSL during instance creation. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL mechanism sasl.mechanism=PLAIN #Encryption protocol. Currently, only SASL_SSL is supported. security.protocol=SASL_SSL #Location of ssl.truststore ssl.truststore.location=E:\\temp\\client.truststore.jks #Password of ssl.truststore ssl.truststore.password=dms@kafka # Disable certificate domain name verification. ssl.endpoint.identification.algorithm=
Producing Messages
- Test code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
package com.dms.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class DmsProducerTest { @Test public void testProducer() throws Exception { DmsProducer<String, String> producer = new DmsProducer<String, String>(); int partiton = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code. producer.produce("topic-0", partiton, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { // TODO: Exception handling e.printStackTrace(); } finally { producer.close(); } } }
- Message production code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
package com.dms.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class DmsProducer<K, V> { //Add the producer configurations that have been specified earlier. public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties"; private Producer<K, V> producer; DmsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } DmsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsProducer.class.getClassLoader(); } return classLoader; } /** * Load configuration information from classpath. * * @param configFileName Configuration file name * @return Configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
Consuming Messages
- Test code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class DmsConsumerTest { @Test public void testConsumer() throws Exception { DmsConsumer consumer = new DmsConsumer(); consumer.consume(Arrays.asList("topic-0")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: Exception handling e.printStackTrace(); }finally { consumer.close(); } } }
- Message consumption code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class DmsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; DmsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } DmsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsConsumer.class.getClassLoader(); } return classLoader; } /** * Load configuration information from classpath. * * @param configFileName Configuration file name * @return Configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot