Java客户端使用说明
操作场景
本文介绍Java版本的Kafka客户端连接指导,并完成客户端连接以及消息生产与消费的相关示例。
引入Kafka客户端
MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Java开源客户端的版本使用请参见客户端版本使用建议。
根据实例的Kafka版本信息使用对应版本的客户端,此处以2.7.2版本客户端为例进行说明。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.2</version> </dependency>
修改配置信息
为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。
- 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)
以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置用户名和密码 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="********"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap.servers:MQS连接地址和端口。
- username和password:开启SASL_SSL认证时所使用的用户名和密码。
- ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
- 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)
以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。
#Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,请您自行设定。 #如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置jaas账号和密码,通过控制台设置 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="********"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap.servers:MQS连接地址和端口。
- group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- username和password:开启SASL_SSL认证时所使用的用户名和密码。
- ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
生产消息
- 测试代码:
package com.mqs.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class MqsProducerTest { @Test public void testProducer() throws Exception { MqsProducer<String, String> producer = new MqsProducer<String, String>(); int partiton = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; // 注意填写您创建的Topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。 producer.produce("topicName", partiton, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } finally { producer.close(); } } }
- 生产消息代码:
package com.mqs.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class MqsProducer<K, V> { //引入生产消息的配置信息,具体内容参考上文 public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties"; private Producer<K, V> producer; MqsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } MqsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsProducer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
消费消息
- 测试代码:
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class MqsConsumerTest { @Test public void testConsumer() throws Exception { MqsConsumer consumer = new MqsConsumer(); // 注意填写要消费消息的Topic名称。 consumer.consume(Arrays.asList("topicName")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); }finally { consumer.close(); } } }
- 消费消息代码:
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class MqsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; MqsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } MqsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsConsumer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }