文档首页/ 分布式消息服务Kafka版/ 用户指南/ 连接实例/ 使用客户端连接Kafka(开启SASL)
更新时间:2024-12-12 GMT+08:00
分享

使用客户端连接Kafka(开启SASL)

本章节介绍如何使用开源的Kafka客户端访问开启密文接入的Kafka实例的方法。密文接入表示客户端访问Kafka实例时,需要进行SASL认证。如果Kafka安全协议为“SASL_SSL”,客户端和Kafka实例进行通信时,数据使用加密传输,安全性更高。

操作视频

本视频演示客户端连接开启SASL_SSL的Kafka实例的操作。

约束与限制

  • 由于安全问题,2021年3月20日前以及当天创建的实例,支持的加密套件为TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256。2021年3月20日后创建的实例,支持的加密套件为TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
  • 2020年7月以及之后购买的实例,Kafka实例的每个代理允许客户端单IP连接的个数默认为1000个,在此之前购买的实例,Kafka实例的每个代理允许客户端单IP连接的个数默认为200个,如果超过了,会出现连接失败问题。您可以通过修改Kafka实例配置参数来修改单IP的连接数,即修改max.connections.per.ip参数值。

前提条件

  • 已打通客户端和Kafka实例之间的网络,具体网络要求请参考连接Kafka网络要求
  • 已配置正确的安全组。

    客户端访问开启密文接入的Kafka实例时,实例需要配置正确的安全组规则,否则会连接失败。安全组规则的配置请参考表2

  • 已获取连接Kafka实例的地址。
    在Kafka控制台的“基本信息 > 连接信息”中获取实例连接地址。Kafka实例连接地址在Kafka控制台存在两种不同的显示,一种为“内网连接地址”/“公网连接地址”,另一种为“内网密文连接地址”/“公网密文连接地址”,具体以控制台显示为准。
    • 如果是使用内网通过同一个VPC访问,Kafka连接地址如下图所示。
      图1 使用内网通过同一个VPC访问Kafka实例的连接地址(内网连接地址)
      图2 使用内网通过同一个VPC访问Kafka实例的连接地址(内网密文连接地址)
    • 如果是公网访问,Kafka连接地址如下图所示。
      图3 公网访问Kafka实例的连接地址(公网连接地址)
      图4 公网访问Kafka实例的连接地址(公网密文连接地址)

  • 已获取开启的SASL认证机制。

    在Kafka实例详情页的“连接信息”区域,查看“开启的SASL认证机制”。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。

    图5 开启的SASL认证机制
  • 已获取启用的安全协议。

    在Kafka实例详情页的“连接信息”区域,查看“启用的安全协议”。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。

  • 如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic,否则会连接失败。
  • 已下载client.jks证书。如果没有,在控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。
  • 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本或者Kafka命令行工具2.7.2版本或者Kafka命令行工具3.4.0版本,确保Kafka实例版本与命令行工具版本相同。
  • 已在服务器中安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,具体方法如下:

    使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。

    export JAVA_HOME=/opt/java/jdk1.8.0_151 
    export PATH=$JAVA_HOME/bin:$PATH

    执行source .bash_profile命令使修改生效。

命令行模式连接实例

以下操作命令以Linux系统为例进行说明。

  1. 在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。

    其中,IP地址必须为实例连接地址(从前提条件获取的连接地址),host为每个实例主机的名称(主机的名称由您自行设置,但不能重复)。

    例如:

    10.154.48.120 server01
    10.154.48.121 server02
    10.154.48.122 server03

  2. 解压Kafka命令行工具。

    进入文件压缩包所在目录,然后执行以下命令解压文件。

    tar -zxf [kafka_tar]

    其中,[kafka_tar]表示命令行工具的压缩包名称。

    例如:

    tar -zxf kafka_2.12-2.7.2.tgz

  3. 根据SASL认证机制,修改Kafka命令行工具配置文件。

    • PLAIN机制:在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=PLAIN

      参数说明:

      username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。

    • SCRAM-SHA-512机制:在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=SCRAM-SHA-512

      参数说明:

      username和password为首次开启密文接入时填入的用户名和密码,或者创建用户时设置的用户名和密码。

  4. 根据安全协议,修改Kafka命令行工具配置文件。

    • SASL_SSL:在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
      security.protocol=SASL_SSL
      ssl.truststore.location={ssl_truststore_path}
      ssl.truststore.password=dms@kafka
      ssl.endpoint.identification.algorithm=

      参数说明:

      • ssl.truststore.location配置为client.jks证书的存放路径。注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
      • ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka
      • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空
    • SASL_PLAINTEXT:在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件,并分别在文件中增加如下内容。
      security.protocol=SASL_PLAINTEXT

  5. 进入Kafka命令行工具的“/bin”目录下。

    注意,Windows系统下需要进入“/bin/windows”目录下。

  6. 执行如下命令进行生产消息。

    ./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties

    参数说明如下:

    • 连接地址:从前提条件获取的连接地址。
    • Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。

    本文以公网访问为例,Kafka实例连接地址为“10.xx.xx.45:9095,10.xx.xx.127:9095,10.xx.xx.103:9095”。

    执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。

    [root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.xx.xx.45:9095,10.xx.xx.127:9095,10.xx.xx.103:9095  --topic topic-demo --producer.config ../config/producer.properties
    >Hello
    >DMS
    >Kafka!
    >^C[root@ecs-kafka bin]# 

    如需停止生产使用Ctrl+C命令退出。

  7. 执行如下命令消费消息。

    ./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning  --consumer.config ../config/consumer.properties

    参数说明如下:

    • 连接地址:从前提条件获取的连接地址。
    • Topic名称:Kafka实例下创建的Topic名称。
    • 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。

    示例如下:

    [root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 10.xx.xx.45:9095,10.xx.xx.127:9095,10.xx.xx.103:9095 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
    Hello
    DMS
    Kafka!
    ^CProcessed a total of 3 messages
    [root@ecs-kafka bin]# 

    如需停止消费使用Ctrl+C命令退出。

相关文档