MQS基于Kafka社区版本1.1.0和2.3.0,请使用相同版本的客户端。您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。
- 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)
#topic名称在具体的生产与消费代码中。 ####################### #broker信息请从控制台界面获取。 #举例:bootstrap.servers=,, bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置jaas用户和密码,通过控制台设置 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,固定,请勿修改 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
- 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)
以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考 收集连接信息获取。
#topic名称在具体的生产与消费代码中。 ####################### #broker信息请从控制台界面获取。 #举例:bootstrap.servers=,, bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,请您自行设定。 #如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置jaas帐号和密码,通过控制台设置 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
- 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
package com.mqs.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class MqsProducerTest { @Test public void testProducer() throws Exception { MqsProducer<String, String> producer = new MqsProducer<String, String>(); int partiton = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; // 注意填写您创建的topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。 producer.produce("topic-0", partiton, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } finally { producer.close(); } } }
- 生产消息代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
package com.mqs.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class MqsProducer<K, V> { //引入生产消息的配置信息,具体内容参考上文 public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties"; private Producer<K, V> producer; MqsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } MqsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * 生产消息 * * @param topic topic对象 * @param partition partition * @param key 消息key * @param data 消息数据 * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsProducer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
- 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class MqsConsumerTest { @Test public void testConsumer() throws Exception { MqsConsumer consumer = new MqsConsumer(); consumer.consume(Arrays.asList("topic-0")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); }finally { consumer.close(); } } }
- 消费消息代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
package com.mqs.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class MqsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; MqsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } MqsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = MqsConsumer.class.getClassLoader(); } return classLoader; } /** * 从classpath 加载配置信息 * * @param configFileName 配置文件名称 * @return 配置信息 * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
