更新时间:2023-11-08 GMT+08:00
分享

Java开发环境搭建

基于收集连接信息的介绍,假设您已经获取了实例连接相关的信息,以及配置好客户端的网络环境。本章节以生产与发送消息的Demo为例,介绍Kafka客户端的环境配置。

开发环境

操作步骤

  1. 下载Demo包

    下载后解压,有如下文件:

    表1 Kafka Demo文件清单

    文件名

    路径

    说明

    DmsConsumer.java

    .\src\main\java\com\dms\consumer

    消费消息的API。

    DmsProducer.java

    .\src\main\java\com\dms\producer

    生产消息的API。

    dms.sdk.consumer.properties

    .\src\main\resources

    消费消息的配置信息。

    dms.sdk.producer.properties

    .\src\main\resources

    生产消息的配置信息。

    client.jks

    .\src\main\resources

    SSL证书,用于SASL方式连接。

    DmsConsumerTest.java

    .\src\test\java\com\dms\consumer

    消费消息的测试代码。

    DmsProducerTest.java

    .\src\test\java\com\dms\producer

    生产消息的测试代码。

    pom.xml

    .\

    maven配置文件,包含Kafka客户端引用。

  2. 打开IntelliJ IDEA,导入Demo。

    Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。
    图1 选择“导入工程”
    图2 选择“Maven”
    图3 选择Java环境

    其他选项可默认或自主选择。然后单击Finish,完成Demo导入。

    导入后Demo工程如下:

  3. 配置Maven路径。

    打开“File > Settings”,找到“Maven home directory”信息项,选择正确的Maven路径,以及Maven所需的settings.xml文件。

  4. 修改Kafka配置信息。

    以生产消息为例,需配置以下信息,其中加粗内容必须修改。

    #以下粗体部分为不同Kafka实例特有的信息,必须修改。客户端其他参数,可以自主添加
    #Topic名称在具体的生产与消费代码中。
    #######################
    #Kafka实例的broker信息,ip:port为实例的连接地址和端口,参考“收集连接信息”章节获取。举例:bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置SASL认证机制、账号和密码。
    #sasl.mechanism为SASL认证机制,username和password为SASL_SSL的用户名和密码,参考“收集连接信息”章节获取。
    #SASL认证机制为“PLAIN”时,配置信息如下。
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    #设置Kafka安全协议。security.protocol为安全协议。
    #安全协议为“SASL_SSL”时,配置信息如下。
    security.protocol=SASL_SSL
    #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。
    ssl.truststore.location=E:\\temp\\client.jks
    #ssl truststore.password为服务器证书密码,配置此密码是为了访问Java生成的jks文件。
    ssl.truststore.password=dms@kafka
    #ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。
    ssl.endpoint.identification.algorithm=
    #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    security.protocol=SASL_PLAINTEXT

  5. 在IDEA工具的左下角,打开Terminal窗口,执行mvn test命令体验demo。

    图4 IDEA的Terminal窗口位置

    生产消息会得到以下回显信息:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.producer.DmsProducerTest
    produce msg:The msg is 0
    produce msg:The msg is 1
    produce msg:The msg is 2
    produce msg:The msg is 3
    produce msg:The msg is 4
    produce msg:The msg is 5
    produce msg:The msg is 6
    produce msg:The msg is 7
    produce msg:The msg is 8
    produce msg:The msg is 9
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec

    消费消息会得到以下回显信息:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.consumer.DmsConsumerTest
    the numbers of topic:0
    the numbers of topic:0
    the numbers of topic:6
    ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 2)
    ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 5)

分享:

    相关文档

    相关产品