更新时间:2024-08-06 GMT+08:00

Java客户端接入示例

本文介绍Maven方式引入Kafka客户端,并完成Kafka实例连接以及消息生产与消费的相关示例。如果您需要在IDE中查看Demo具体表现,请查看Java开发环境搭建

下文所有Kafka的配置信息,如实例连接地址、Topic名称、用户信息等,请参考收集连接信息获取。

Maven中引入Kafka客户端

//Kafka实例基于社区版本1.1.0/2.3.0/2.7/3.x,推荐客户端保持一致。
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0/2.3.0/2.7.2/3.4.0</version>
        </dependency>

准备Kafka配置信息

为了方便,下文分生产与消费两个配置文件介绍。其中涉及SASL认证配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。

  • 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件)

    以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。

    #Topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用密文接入,以下参数请注释掉
    #######################
    #设置SASL认证机制、账号和密码。
    #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
    #SASL认证机制为“PLAIN”时,配置信息如下。
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    #设置Kafka安全协议。security.protocol为安全协议。
    #安全协议为“SASL_SSL”时,配置信息如下。
    security.protocol=SASL_SSL
    #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
    ssl.truststore.location=E:\\temp\\client.jks
    #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
    ssl.endpoint.identification.algorithm=
    #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    security.protocol=SASL_PLAINTEXT
  • 消费消息配置文件(对应消费消息代码中的dms.sdk.consumer.properties文件)

    以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加。

    #Topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
    group.id=1
    #键的序列化方式
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #值的序列化方式
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #偏移量的方式
    auto.offset.reset=earliest
    #######################
    #如果不使用密文接入,以下参数请注释掉。
    #######################
    #设置SASL认证机制、账号和密码。
    #sasl.mechanism为SASL认证机制,username和password为SASL的用户名和密码,参考“收集连接信息”章节获取。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
    #SASL认证机制为“PLAIN”时,配置信息如下。
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    #设置Kafka安全协议。security.protocol为安全协议。
    #安全协议为“SASL_SSL”时,配置信息如下。
    security.protocol=SASL_SSL
    #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
    ssl.truststore.location=E:\\temp\\client.jks
    #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
    ssl.endpoint.identification.algorithm=
    #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    security.protocol=SASL_PLAINTEXT

生产消息

  • 测试代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.dms.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class DmsProducerTest {
        @Test
        public void testProducer() throws Exception {
            DmsProducer<String, String> producer = new DmsProducer<String, String>();
            int partition = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    // 注意填写您创建的topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。
                    producer.produce("topic-0", partition, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                // TODO: 异常处理
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
    
  • 生产消息代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    package com.dms.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class DmsProducer<K, V> {
         //引入生产消息的配置信息,具体内容参考上文
        public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        DmsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        DmsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }
    

消费消息

  • 测试代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class DmsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            DmsConsumer consumer = new DmsConsumer();
            consumer.consume(Arrays.asList("topic-0"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                // TODO: 异常处理
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
    
  • 消费消息代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class DmsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        DmsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        DmsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }