更新时间:2024-08-05 GMT+08:00

使用Kafka客户端SSL加密

前提说明

  • 客户端使用SSL功能前,必须要保证服务端SSL对应服务功能已经开启(服务端参数“ssl.mode.enable”设置为“true”)。
  • SSL功能需要配合API进行使用,可参考Kafka安全使用说明章节。

使用说明

  • Linux客户端使用SSL功能
    1. 修改“客户端安装目录/Kafka/kafka/config/producer.properties”“客户端安装目录/Kafka/kafka/config/consumer.properties”“security.protocol”的值为“SASL_SSL”或者“SSL”
    2. 进入“客户端安装目录/Kafka/kafka/bin”使用shell命令时,根据上一步中配置的协议填写对应的端口,例如使用配置的“security.protocol”“SASL_SSL”,则需要填写SASL_SSL协议端口,默认为21009:

      shkafka-console-producer.sh --broker-list <Kafka集群IP:21009> --topic <Topic名称> --producer.config config/producer.properties

      shkafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21009> --consumer.config config/consumer.properties

  • Windows客户端代码使用SSL功能
    1. 下载Kafka客户端,解压后在根目录中找到ca.crt证书文件。
    2. 使用ca.crt证书生成客户端的truststore。

      在安装了Java的环境下执行命令:

      keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks

    3. 将生成的truststore.jks复制至IntelliJ IDEA工程的conf目录下,并在客户端代码中(Producer.java或者Consumer.java的构造方法)添加如下代码:
      //truststore文件地址
      props.put("ssl.truststore.location", System.getProperty("user.dir") + File.separator + "conf" + File.separator + "truststore.jks");
      //truststore文件密码(生成时输入的密码)
      props.put("ssl.truststore.password", "XXXXX");
    4. 按需修改客户端样例工程的“src/main/resources”目录下的“producer.properties”“consumer.properties”中的“security.protocol”的值,同时修改“producer.properties”中的“bootstrap.servers”的值,确保security.protocol协议类型和bootstrap.servers中的端口号匹配。