Java开发环境搭建
基于收集连接信息的介绍,假设您已经获取了实例连接相关的信息,以及配置好客户端的网络环境。本章节以生产与发送消息的Demo为例,介绍Kafka客户端的环境配置。
开发环境准备
- Maven:
获取并安装Apache Maven 3.0.3及以上版本,可至Maven官方下载页面下载。
- JDK:
获取并安装Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。
安装后注意配置Java的环境变量。
- 获取并安装2018.3.5或以上版本的IntelliJ IDEA,可至IntelliJ IDEA官方网站下载。
操作步骤
- 下载Demo包。
在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的“用户指南 > 下载Kafka客户端 Java Demo包”下载Demo。
解压后,有如下文件:
表1 Kafka Demo文件清单 文件名
路径
说明
MqsConsumer.java
.\src\main\java\com\mqs\consumer
消费消息的API。
MqsProducer.java
.\src\main\java\com\mqs\producer
生产消息的API。
mqs.sdk.consumer.properties
.\src\main\resources
消费消息的配置信息。
mqs.sdk.producer.properties
.\src\main\resources
生产消息的配置信息。
client.truststore.jks
.\src\main\resources
SSL证书,用于SASL方式连接。
MqsConsumerTest.java
.\src\test\java\com\mqs\consumer
消费消息的测试代码。
MqsProducerTest.java
.\src\test\java\com\mqs\producer
生产消息的测试代码。
pom.xml
.\
maven配置文件,包含Kafka客户端引用。
- 打开IntelliJ IDEA,导入Demo。
Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插件。图1 选择“导入工程”
图2 选择“Maven”
图3 选择Java环境
其他选项可默认或自主选择。然后单击Finish,完成Demo导入。
导入后Demo工程如下:
- 配置Maven路径。
打开“File > Settings”,找到“Maven home directory”信息项,选择正确的Maven路径,以及Maven所需的settings.xml文件。
- 修改客户端配置信息。
以生产消息为例,需配置以下信息,其中加粗内容必须修改。
#以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加 #topic名称在具体的生产与消费代码中。 ####################### #broker信息请从控制台界面获取。 #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置jaas用户和密码,通过控制台设置 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置,证书文件在demo包的\src\main\resources路径下。 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
- 打开IDEA的Terminal窗口,执行mvn test命令体验demo。
Terminal窗口默认在IDEA工具的左下角:
图4 IDEA的Terminal窗口位置
生产消息会得到以下回显信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
------------------------------------------------------- T E S T S ------------------------------------------------------- Running com.mqs.producer.MqsProducerTest produce msg:The msg is 0 produce msg:The msg is 1 produce msg:The msg is 2 produce msg:The msg is 3 produce msg:The msg is 4 produce msg:The msg is 5 produce msg:The msg is 6 produce msg:The msg is 7 produce msg:The msg is 8 produce msg:The msg is 9 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec
消费消息会得到以下回显信息:
1 2 3 4 5 6 7 8 9
------------------------------------------------------- T E S T S ------------------------------------------------------- Running com.mqs.consumer.MqsConsumerTest the numbers of topic:0 the numbers of topic:0 the numbers of topic:6 ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 2) ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 5)