更新时间:2023-04-23 GMT+08:00

Java客户端使用说明

本文介绍Maven方式下为MQS引入Kafka客户端,并完成客户端连接以及消息生产与消费的相关示例。如果您需要在查看Demo具体表现,请查看Java开发环境搭建

下文所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考收集连接信息获取。

Maven中引入Kafka客户端

MQS基于Kafka社区版本1.1.0和2.3.0,请使用相同版本的客户端。您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。

若使用1.1.0版本客户端,则version参数设置为1.1.0,若使用2.3.0版本客户端,则version参数设置为2.3.0,此处以2.3.0版本客户端为例进行说明。

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.3.0</version>
</dependency>

准备配置信息

为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。

  • 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)
    以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考收集连接信息获取。
    #topic名称在具体的生产与消费代码中。
    #######################
    #broker信息请从控制台界面获取。
    #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置jaas用户和密码,通过控制台设置
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码,固定,请勿修改
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=
  • 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)
    以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考 收集连接信息获取。
    #topic名称在具体的生产与消费代码中。
    #######################
    #broker信息请从控制台界面获取。
    #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #用来唯一标识consumer进程所在组的字符串,请您自行设定。
    #如果设置同样的group id,表示这些processes都是属于同一个consumer group
    group.id=1
    #键的序列化方式
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #值的序列化方式
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #偏移量的方式
    auto.offset.reset=earliest
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置jaas帐号和密码,通过控制台设置
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

生产消息

  • 测试代码:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.mqs.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class MqsProducerTest {
        @Test
        public void testProducer() throws Exception {
            MqsProducer<String, String> producer = new MqsProducer<String, String>();
            int partiton = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    // 注意填写您创建的topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。
                    producer.produce("topic-0", partiton, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                // TODO: 异常处理
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
    
  • 生产消息代码:
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    package com.mqs.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class MqsProducer<K, V> {
         //引入生产消息的配置信息,具体内容参考上文
        public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        MqsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        MqsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }
    

消费消息

  • 测试代码:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class MqsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            MqsConsumer consumer = new MqsConsumer();
            consumer.consume(Arrays.asList("topic-0"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                // TODO: 异常处理
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
    
  • 消费消息代码:
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class MqsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        MqsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        MqsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }