更新时间:2022-02-21 GMT+08:00

Java开发环境搭建

基于收集连接信息的介绍,假设您已经获取了实例连接相关的信息,以及配置好客户端的网络环境。本章节以生产与发送消息的Demo为例,介绍Kafka客户端的环境配置。

开发环境

操作步骤

  1. 下载Demo包

    下载后解压,有如下文件:

    表1 Kafka Demo文件清单

    文件名

    路径

    说明

    DmsConsumer.java

    .\src\main\java\com\dms\consumer

    消费消息的API。

    DmsProducer.java

    .\src\main\java\com\dms\producer

    生产消息的API。

    dms.sdk.consumer.properties

    .\src\main\resources

    消费消息的配置信息。

    dms.sdk.producer.properties

    .\src\main\resources

    生产消息的配置信息。

    client.truststore.jks

    .\src\main\resources

    SSL证书,用于SASL方式连接。

    DmsConsumerTest.java

    .\src\test\java\com\dms\consumer

    消费消息的测试代码。

    DmsProducerTest.java

    .\src\test\java\com\dms\producer

    生产消息的测试代码。

    pom.xml

    .\

    maven配置文件,包含Kafka客户端引用。

  2. 打开IntelliJ IDEA,导入Demo。

    Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。
    图1 选择“导入工程”
    图2 选择“Maven”
    图3 选择Java环境

    其他选项可默认或自主选择。然后单击Finish,完成Demo导入。

    导入后Demo工程如下:

  3. 配置Maven路径。

    打开“File > Settings”,找到“Maven home directory”信息项,选择正确的Maven路径,以及Maven所需的settings.xml文件。

  4. 修改Kafka配置信息。

    以生产消息为例,需配置以下信息,其中加粗内容必须修改。

    #以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加
    #topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置jaas帐号和密码,usernamepassword为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码。
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码
    ssl.truststore.password=dms@kafka

  5. 在IDEA工具的左下角,打开Terminal窗口,执行mvn test命令体验demo。

    图4 IDEA的Terminal窗口位置

    生产消息会得到以下回显信息:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.producer.DmsProducerTest
    produce msg:The msg is 0
    produce msg:The msg is 1
    produce msg:The msg is 2
    produce msg:The msg is 3
    produce msg:The msg is 4
    produce msg:The msg is 5
    produce msg:The msg is 6
    produce msg:The msg is 7
    produce msg:The msg is 8
    produce msg:The msg is 9
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec

    消费消息会得到以下回显信息:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.consumer.DmsConsumerTest
    the numbers of topic:0
    the numbers of topic:0
    the numbers of topic:6
    ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 2)
    ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 5)