使用客户端连接Kafka(关闭SASL)
本章节介绍如何使用开源的Kafka客户端访问关闭SASL的Kafka实例的方法。关闭SASL表示客户端访问Kafka实例时,无需进行认证,数据通过明文传输,性能更好。
约束与限制
2020年7月以及之后购买的实例,Kafka实例的每个代理允许客户端单IP连接的个数默认为1000个,在此之前购买的实例,Kafka实例的每个代理允许客户端单IP连接的个数默认为200个,如果超过了,会出现连接失败问题。您可以通过修改Kafka实例配置参数来修改单IP的连接数,即修改max.connections.per.ip参数值。
前提条件
- 已打通客户端和Kafka实例之间的网络,具体网络要求请参考连接Kafka网络要求。
- 已配置正确的安全组。
客户端访问关闭SASL的Kafka实例前,Kafka实例需要配置正确的安全组规则,否则会连接失败。安全组规则的配置请参考表2。
- 已获取连接Kafka实例的地址。
在Kafka控制台的“基本信息 > 连接信息”中获取实例连接地址。Kafka实例连接地址在Kafka控制台存在两种不同的显示,一种为“内网连接地址”/“公网连接地址”,另一种为“内网明文连接地址”/“公网明文连接地址”,具体以控制台显示为准。
- 如果是使用内网通过同一个VPC访问,Kafka连接地址如下图所示。
图1 使用内网通过同一个VPC访问Kafka实例的连接地址(内网连接地址)
图2 使用内网通过同一个VPC访问Kafka实例的连接地址(内网明文连接地址)
- 如果是公网访问,Kafka连接地址如下图所示。
图3 公网访问Kafka实例的连接地址(公网连接地址)
图4 公网访问Kafka实例的连接地址(公网明文连接地址)
- 如果是使用内网通过同一个VPC访问,Kafka连接地址如下图所示。
- 如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic,否则会连接失败。
- 已下载Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本或者Kafka命令行工具2.7.2版本或者Kafka命令行工具3.4.0版本,确保Kafka实例版本与命令行工具版本相同。
- 已在服务器中安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,具体方法如下:
使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
命令行模式连接实例
以下操作命令以Linux系统为例进行说明:
- 解压Kafka命令行工具。
进入文件压缩包所在目录,然后执行以下命令解压文件。
tar -zxf [kafka_tar]
其中,[kafka_tar]表示命令行工具的压缩包名称。
例如:
tar -zxf kafka_2.12-2.7.2.tgz
- 进入Kafka命令行工具的“/bin”目录下。
注意,Windows系统下需要进入“/bin/windows”目录下。
- 执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称}
参数说明如下:
- 连接地址:从前提条件中获取的连接地址。
- Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。
本文以公网连接为例,获取的Kafka实例公网连接地址为“10.xx.xx.45:9094,10.xx.xx.127:9094,10.xx.xx.103:9094”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]# ./kafka-console-producer.sh --broker-list 10.xx.xx.45:9094,10.xx.xx.127:9094,10.xx.xx.103:9094 --topic topic-demo >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
- 执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning
参数说明如下:
- 连接地址:从前提条件中获取的连接地址。
- Topic名称:Kafka实例下创建的Topic名称。
- 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.xx.xx.45:9094,10.xx.xx.127:9094,10.xx.xx.103:9094 --topic topic-demo --group order-test --from-beginning Kafka! DMS Hello ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。