Configuring Kafka Clients in Java
This section describes how to use the Maven to introduce the Kafka client for MQS, connect clients, and produce and consume messages. For details about how to view the demo details, see Setting Up the Java Development Environment.
All configuration information, such as the MQS connection address, topic name, and user information, mentioned in the following parts can be obtained in Collecting Connection Information.
Adding Kafka Clients in Maven
MQS is based on Kafka 1.1.0 and 2.3.0. Use the Kafka client of the same version. You can view the Kafka version information in the MQS Information area on the Instance Information page of the ROMA Connect console.
If the client of the 1.1.0 version is used, set the version parameter to 1.1.0. If the client of the 2.3.0 version is used, set the version parameter to 2.3.0. The following uses the client of the 2.3.0 version as an example.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
Preparing Configuration Information
The following describes example producer and consumer configuration files. If SASL authentication is enabled for a ROMA Connect instance, you must configure SASL authentication information in the configuration file of the Java client. Otherwise, the connection fails. If SASL authentication is not enabled, comment out the related configuration.
- Producer configuration file (corresponding to the mqs.sdk.producer.properties file in the production message code)
The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required. All configuration information, such as the MQS connection address, topic name, and user information, can be obtained in Collecting Connection Information.
#The topic name is in the specific production and consumption code. ####################### #You can obtain the broker information from the console. #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094. bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #Send acknowledgment parameters. acks=all #Sequence mode of the key. key.serializer=org.apache.kafka.common.serialization.StringSerializer #Sequence mode of the value. value.serializer=org.apache.kafka.common.serialization.StringSerializer #Total bytes of memory the producer can use to buffer records waiting to be sent to the server. buffer.memory=33554432 #Number of retries. retries=0 ####################### #If SASL authentication is not used, comment out the following parameters: ####################### #Set the jaas username and password on the console. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL authentication mode. sasl.mechanism=PLAIN #Encryption protocol. Currently, the SASL_SSL protocol is supported. security.protocol=SASL_SSL #Location of the SSL truststore file. ssl.truststore.location=E:\\temp\\client.truststore.jks #Password of the SSL truststore file. The value is fixed and cannot be changed. ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
- Consumer configuration file (corresponding to the mqs.sdk.consumer.properties file in the consumption message code)
The information in bold is subject to different MQSs and must be modified based on site requirements. Other parameters of the client can be added as required. All configuration information, such as the MQS connection address, topic name, and user information, can be obtained in Collecting Connection Information.
#The topic name is in the specific production and consumption code. ####################### #You can obtain the broker information from the console. #For example, bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094. bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #A character string that uniquely identifies the group to which the consumer process belongs. You can set it as required. #If group id is set to the same value, the processes belong to the same consumer group. group.id=1 #Sequence mode of the key. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Sequence mode of the value. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Offset mode. auto.offset.reset=earliest ####################### #If SASL authentication is not used, comment out the following parameters: ####################### #Set the jaas username and password on the console. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL authentication mode. sasl.mechanism=PLAIN #Encryption protocol. Currently, the SASL_SSL protocol is supported. security.protocol=SASL_SSL #Location of the SSL truststore file. ssl.truststore.location=E:\\temp\\client.truststore.jks #Password of the SSL truststore file. ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
Producing Messages
- Test code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
package com.mqs.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class MqsProducerTest { @Test public void testProducer() throws Exception { MqsProducer<String, String> producer = new MqsProducer<String, String>(); int partiton = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following production message code. producer.produce("topic-0", partiton, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { //TODO: troubleshooting e.printStackTrace(); } finally { producer.close(); } } }
- Production message code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
package com.mqs.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class MqsProducer<K, V> { //Introduce configuration information about production messages. For details, see the preceding description. public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties"; private Producer<K, V> producer; MqsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } MqsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * Production messages * * @param topic topic object * @param partition partition * @param key message key * @param data message data */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * Production messages * * @param topic topic object * @param partition partition * @param key message key * @param data message data * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * Production messages * * @param topic topic object * @param partition partition * @param key message key * @param data message data * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * Production messages * * @param topic topic object * @param partition partition * @param key message key * @param data message data * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsProducer.class.getClassLoader(); } return classLoader; } /** *Load configuration information from classpath. * * @param configFileName configuration file name * @return configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url:properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
Consuming Messages
- Test code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class MqsConsumerTest { @Test public void testConsumer() throws Exception { MqsConsumer consumer = new MqsConsumer(); consumer.consume(Arrays.asList("topic-0")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { //TODO: troubleshooting e.printStackTrace(); }finally { consumer.close(); } } }
- Consumption message code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class MqsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; MqsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } MqsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsConsumer.class.getClassLoader(); } return classLoader; } /** *Load configuration information from classpath. * * @param configFileName configuration file name * @return configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot