更新时间:2024-01-23 GMT+08:00

Java客户端使用说明

操作场景

本文介绍Java版本的Kafka客户端连接指导,并完成客户端连接以及消息生产与消费的相关示例。

前提条件

  • 已获取MQS连接信息,具体请参见开发准备
  • 已安装开发工具和Java开发语言环境,具体请参见开发准备

引入Kafka客户端

MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Java开源客户端的版本使用请参见客户端版本使用建议

根据实例的Kafka版本信息使用对应版本的客户端,此处以2.7.2版本客户端为例进行说明。

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.7.2</version>
</dependency>

修改配置信息

为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。

  • 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)

    以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。

    #Topic名称在具体的生产与消费代码中。
    #######################
    #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置用户名和密码
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap.servers:MQS连接地址和端口。
    • username和password:开启SASL_SSL认证时所使用的用户名和密码。
    • ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
  • 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)

    以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。

    #Topic名称在具体的生产与消费代码中。
    #######################
    #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #用来唯一标识consumer进程所在组的字符串,请您自行设定。
    #如果设置同样的group id,表示这些processes都是属于同一个consumer group
    group.id=1
    #键的序列化方式
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #值的序列化方式
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #偏移量的方式
    auto.offset.reset=earliest
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置jaas账号和密码,通过控制台设置
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="********";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap.servers:MQS连接地址和端口。
    • group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
    • username和password:开启SASL_SSL认证时所使用的用户名和密码。
    • ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。

生产消息

  • 测试代码:
    package com.mqs.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class MqsProducerTest {
        @Test
        public void testProducer() throws Exception {
            MqsProducer<String, String> producer = new MqsProducer<String, String>();
            int partiton = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    // 注意填写您创建的Topic名称。另外,生产消息的API有多个,具体参见Kafka官网或者下文的生产消息代码。
                    producer.produce("topicName", partiton, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                // TODO: 异常处理
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
  • 生产消息代码:
    package com.mqs.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class MqsProducer<K, V> {
         //引入生产消息的配置信息,具体内容参考上文
        public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        MqsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        MqsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * 生产消息
         *
         * @param topic        topic对象
         * @param partition    partition
         * @param key          消息key
         * @param data         消息数据
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }

消费消息

  • 测试代码:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class MqsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            MqsConsumer consumer = new MqsConsumer();
            // 注意填写要消费消息的Topic名称。
            consumer.consume(Arrays.asList("topicName"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                // TODO: 异常处理
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
  • 消费消息代码:
    package com.mqs.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class MqsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        MqsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        MqsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = MqsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * 从classpath 加载配置信息
         *
         * @param configFileName 配置文件名称
         * @return 配置信息
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }