更新时间:2024-08-06 GMT+08:00
Java客户端接入示例
本文介绍Maven方式引入Kafka客户端,并完成Kafka实例连接以及消息生产与消费的相关示例。如果您需要在IDE中查看Demo具体表现,请查看Java开发环境搭建。
下文所有Kafka的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
Maven中引入Kafka客户端
//Kafka实例基于社区版本1.1.0/2.3.0/2.7/3.x,推荐客户端保持一致。 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0/2.3.0/2.7.2/3.4.0</version> </dependency>
准备Kafka配置信息
为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。
- 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件)
以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT
- 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件)
以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用密文接入,以下参数请注释掉。 ####################### #设置SASL认证机制、账号和密码。 #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。security.protocol为安全协议。 #安全协议为“SASL_SSL”时,配置信息如下。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.jks #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security.protocol=SASL_PLAINTEXT
生产消息
- 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
package com.dms.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class DmsProducerTest { @Test public void testProducer() throws Exception { DmsProducer<String, String> producer = new DmsProducer<String, String>(); int partition = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; // 注意填写您创建的topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。 producer.produce("topic-0", partition, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } finally { producer.close(); } } }
- 生产消息代码

package com.dms.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class DmsProducer<K, V> { //引入生产消息的配置信息,具体内容参考上文 public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties"; private Producer<K, V> producer; DmsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } DmsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsProducer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
消费消息
- 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class DmsConsumerTest { @Test public void testConsumer() throws Exception { DmsConsumer consumer = new DmsConsumer(); consumer.consume(Arrays.asList("topic-0")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); }finally { consumer.close(); } } }
- 消费消息代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class DmsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; DmsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } DmsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsConsumer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
父主题: Java