更新时间:2023-04-23 GMT+08:00

Java开发环境搭建

基于收集连接信息的介绍,假设您已经获取了实例连接相关的信息,以及配置好客户端的网络环境。本章节以生产与发送消息的Demo为例,介绍Kafka客户端的环境配置。

开发环境准备

操作步骤

  1. 下载Demo包。

    在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的“用户指南 > 下载Kafka客户端 Java Demo包”下载Demo。

    解压后,有如下文件:

    表1 Kafka Demo文件清单

    文件名

    路径

    说明

    MqsConsumer.java

    .\src\main\java\com\mqs\consumer

    消费消息的API。

    MqsProducer.java

    .\src\main\java\com\mqs\producer

    生产消息的API。

    mqs.sdk.consumer.properties

    .\src\main\resources

    消费消息的配置信息。

    mqs.sdk.producer.properties

    .\src\main\resources

    生产消息的配置信息。

    client.truststore.jks

    .\src\main\resources

    SSL证书,用于SASL方式连接。

    MqsConsumerTest.java

    .\src\test\java\com\mqs\consumer

    消费消息的测试代码。

    MqsProducerTest.java

    .\src\test\java\com\mqs\producer

    生产消息的测试代码。

    pom.xml

    .\

    maven配置文件,包含Kafka客户端引用。

  2. 打开IntelliJ IDEA,导入Demo。

    Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。
    图1 选择“导入工程”
    图2 选择“Maven”
    图3 选择Java环境

    其他选项可默认或自主选择。然后单击Finish,完成Demo导入。

    导入后Demo工程如下:

  3. 配置Maven路径。

    打开“File > Settings”,找到“Maven home directory”信息项,选择正确的Maven路径,以及Maven所需的settings.xml文件。

  4. 修改客户端配置信息。

    以生产消息为例,需配置以下信息,其中加粗内容必须修改。

    #以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加
    #topic名称在具体的生产与消费代码中。
    #######################
    #broker信息请从控制台界面获取。
    #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #发送确认参数
    acks=all
    #键的序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #值的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #producer可以用来缓存数据的内存大小
    buffer.memory=33554432
    #重试次数
    retries=0
    #######################
    #如果不使用SASL认证,以下参数请注释掉。
    #######################
    #设置jaas用户和密码,通过控制台设置
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL鉴权方式
    sasl.mechanism=PLAIN
    #加密协议,目前支持SASL_SSL协议
    security.protocol=SASL_SSL
    #ssl truststore文件的位置,证书文件在demo包的\src\main\resources路径下。
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #ssl truststore文件的密码
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

  5. 打开IDEA的Terminal窗口,执行mvn test命令体验demo。

    Terminal窗口默认在IDEA工具的左下角

    图4 IDEA的Terminal窗口位置

    生产消息会得到以下回显信息:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.mqs.producer.MqsProducerTest
    produce msg:The msg is 0
    produce msg:The msg is 1
    produce msg:The msg is 2
    produce msg:The msg is 3
    produce msg:The msg is 4
    produce msg:The msg is 5
    produce msg:The msg is 6
    produce msg:The msg is 7
    produce msg:The msg is 8
    produce msg:The msg is 9
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec
    

    消费消息会得到以下回显信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.mqs.consumer.MqsConsumerTest
    the numbers of topic:0
    the numbers of topic:0
    the numbers of topic:6
    ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 2)
    ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 5)