使用Kafka客户端SSL加密
前提说明
- 客户端使用SSL功能前,必须要保证服务端SSL对应服务功能已经开启(服务端参数“ssl.mode.enable”设置为“true”)。
- SSL功能需要配合API进行使用,可参考Kafka安全使用说明章节。
使用说明
- Linux客户端使用SSL功能
- 修改“客户端安装目录/Kafka/kafka/config/producer.properties”和“客户端安装目录/Kafka/kafka/config/consumer.properties”中“security.protocol”的值为“SASL_SSL”或者“SSL”。
- 使用shell命令时,根据上一步中配置的协议填写对应的端口,例如使用配置的“security.protocol”为“SASL_SSL”,则需要填写SASL_SSL协议端口,默认为21009:
bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21009> --topic <Topic名称> --producer.config config/producer.properties
bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21009> --consumer.config config/consumer.properties
- Windows客户端代码使用SSL功能
- 下载Kafka客户端,解压后在根目录中找到ca.crt证书文件。
- 使用ca.crt证书生成客户端的truststore。
keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks。
- 将生成的truststore.jks复制至IntelliJ IDEA工程的conf目录下,并在客户端代码中(Producer.java或者Consumer.java的构造方法)添加如下代码:
//truststore文件地址 props.put("ssl.truststore.location", System.getProperty("user.dir") + File.separator + "conf" + File.separator + "truststore.jks"); //truststore文件密码(生成时输入的密码) props.put("ssl.truststore.password", "XXXXX");
- 按需修改客户端样例工程的“src/main/resources”目录下的“producer.properties”和“consumer.properties”中的“security.protocol”的值,同时修改“producer.properties”中的“bootstrap.servers”的值,确保security.protocol协议类型和bootstrap.servers中的端口号匹配。