使用客户端连接Kafka(密文接入)
本章节介绍如何使用开源的Kafka客户端访问开启SASL的Kafka实例的方法。开启SASL表示客户端访问Kafka实例时,需要进行SASL认证且数据加密传输,安全性更高。
约束与限制
- 由于安全问题,支持的加密套件为TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256和TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256。
- Kafka实例的每个代理允许客户端单IP连接的个数默认为1000个,如果超过了,会出现连接失败问题。您可以通过修改Kafka实例配置参数来修改单IP的连接数,即修改max.connections.per.ip参数值。
前提条件
- 已连接客户端和Kafka实例之间的网络,具体网络要求请参考连接Kafka网络要求。
- 已配置正确的安全组。
客户端访问开启密文接入的Kafka实例时,实例需要配置正确的安全组规则,否则会连接失败。安全组规则的配置请参考表2。
- 已获取连接Kafka实例的地址。
在Kafka控制台的“概览 > 连接信息”中获取实例连接地址。
使用内网通过同一个VPC访问,Kafka连接地址如下图所示。
图1 使用内网通过同一个VPC访问Kafka实例的连接地址 - 已获取开启的SASL认证机制。
在Kafka实例详情页的“连接信息”区域,查看“开启的SASL认证机制”。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
图2 开启的SASL认证机制 - 已获取启用的安全协议。
在Kafka实例详情页的“连接信息”区域,查看“启用的安全协议”。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
- 如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic,否则会连接失败。
- 已下载client.truststore.jks证书。如果没有,在控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.truststore.jks。
- 已下载对应版本的Kafka命令行工具,确保Kafka实例版本与命令行工具版本相同。
- 已在服务器中安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,具体方法如下:
使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
命令行模式连接实例
以下操作命令以Linux系统为例进行说明。
- 在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。
- 执行以下命令,编辑“/etc/hosts”文件。
vim /etc/hosts
- 在键盘上按“i”,在“/etc/hosts”文件中增加host和IP的映射关系。
ip1 主机名称1 ip2 主机名称2 ip3 主机名称3
表1 “/etc/hosts”文件参数说明 参数名称
说明
IP地址
Kafka实例的连接地址,从前提条件中获取。
主机名称
每个实例主机的名称。
主机的名称由您自行设置,但不能重复。
例如:
10.154.48.120 server01 10.154.48.121 server02 10.154.48.122 server03
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“/etc/hosts”文件。
:wq
- 执行以下命令,编辑“/etc/hosts”文件。
- 解压Kafka命令行工具。
进入文件压缩包所在目录,然后执行以下命令解压文件。
tar -zxf {kafka_tar}
其中,{kafka_tar}表示命令行工具的压缩包名称。
例如:
tar -zxf kafka_2.12-2.7.2.tgz
- 修改Kafka命令行工具配置文件。
配置文件包括生产者配置文件“producer.properties”和消费者配置文件“consumer.properties”,需要分别修改。
修改“producer.properties”文件:
- 进入Kafka命令行工具的“/config”目录。
cd {kafka_tar}/config
- 编辑“producer.properties”文件。
vim producer.properties
- 在键盘上按“i”,然后在“producer.properties”文件中添加如下内容。
- Kafka实例包含两种认证机制,根据前提条件中获取的SASL认证机制,将对应认证机制的内容添加到配置文件中。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。
- PLAIN机制:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
表2 SASL认证机制参数说明 参数名称
说明
username
首次开启密文接入时设置的用户名,或者创建用户时设置的用户名。
password
首次开启密文接入时设置的密码,或者创建用户时设置的密码。
- SCRAM-SHA-512机制:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512
表3 SASL认证机制参数说明 参数名称
说明
username
首次开启密文接入时设置的用户名,或者创建用户时设置的用户名。
password
首次开启密文接入时设置的密码,或者创建用户时设置的密码。
- PLAIN机制:
- Kafka实例包含两种安全协议,根据前提条件中获取的安全协议,将对应安全协议的内容添加到配置文件中。
- SASL_SSL:
security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
表4 SASL_SSL参数说明 参数名称
说明
ssl.truststore.location
client.truststore.jks证书的存放路径。
注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
ssl.truststore.password
Kafka客户端证书密码。
默认为dms@kafka,不可更改。
ssl.endpoint.identification.algorithm
证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- SASL_PLAINTEXT:
security.protocol=SASL_PLAINTEXT
- SASL_SSL:
- Kafka实例包含两种认证机制,根据前提条件中获取的SASL认证机制,将对应认证机制的内容添加到配置文件中。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“producer.properties”文件。
:wq
修改“consumer.properties”文件:
- 编辑“consumer.properties”文件。
vim consumer.properties
- 在键盘上按“i”,然后在“consumer.properties”文件中添加如下内容。
- Kafka实例包含两种认证机制,根据前提条件中获取的SASL认证机制,将对应认证机制的内容添加到配置文件中。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。
- PLAIN机制:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=PLAIN
表5 SASL认证机制参数说明 参数名称
说明
username
首次开启密文接入时设置的用户名,或者创建用户时设置的用户名。
password
首次开启密文接入时设置的密码,或者创建用户时设置的密码。
- SCRAM-SHA-512机制:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512
表6 SASL认证机制参数说明 参数名称
说明
username
首次开启密文接入时设置的用户名,或者创建用户时设置的用户名。
password
首次开启密文接入时设置的密码,或者创建用户时设置的密码。
- PLAIN机制:
- Kafka实例包含两种安全协议,根据前提条件中获取的安全协议,将对应安全协议的内容添加到配置文件中。
- SASL_SSL:
security.protocol=SASL_SSL ssl.truststore.location={ssl_truststore_path} ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
表7 SASL_SSL参数说明 参数名称
说明
ssl.truststore.location
client.truststore.jks证书的存放路径。
注意,Windows系统下证书路径中也必须使用“/”,不能使用Windows系统中复制路径时的“\”,否则客户端获取证书失败。
ssl.truststore.password
Kafka客户端证书密码。
默认为dms@kafka,不可更改。
ssl.endpoint.identification.algorithm
证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
- SASL_PLAINTEXT:
security.protocol=SASL_PLAINTEXT
- SASL_SSL:
- Kafka实例包含两种认证机制,根据前提条件中获取的SASL认证机制,将对应认证机制的内容添加到配置文件中。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“consumer.properties”文件。
:wq
- 进入Kafka命令行工具的“/config”目录。
- 进入Kafka命令行工具的“/bin”目录下。
注意,Windows系统下需要进入“/bin/windows”目录下。
cd ../bin
- 执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic名称} --producer.config ../config/producer.properties
表8 生产消息参数说明 参数名称
说明
连接地址
Kafka实例的连接地址。
从前提条件中获取连接地址。
Topic名称
Kafka实例下创建的Topic名称。
如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。
如下示例,Kafka实例连接地址为“10.xx.xx.45:9093,10.xx.xx.127:9093,10.xx.xx.103:9093”。
执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.xx.xx.45:9093,10.xx.xx.127:9093,10.xx.xx.103:9093 --topic topic-demo --producer.config ../config/producer.properties >Hello >DMS >Kafka! >^C[root@ecs-kafka bin]#
如需停止生产使用Ctrl+C命令退出。
- 执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server {连接地址} --topic {Topic名称} --group {消费组名称} --from-beginning --consumer.config ../config/consumer.properties
表9 消费消息参数说明 参数名称
说明
连接地址
Kafka实例的连接地址。
从前提条件中获取连接地址。
Topic名称
Kafka实例下创建的Topic名称。
消费组名称
根据您的业务需求,设定消费组名称。消费组名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。
警告:配置文件(即consumer.properties)中已经指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。
查看并修改配置文件中消费组名称的方法参考6.a。
查看并修改配置文件中消费组名称的方法如下:
- 进入Kafka命令行工具的“/config”目录下。
cd ../config
- 编辑“consumer.properties”文件。
vim consumer.properties
- 在键盘上按“i”,查看并修改消费组名称。
“group.id”表示消费组的名称,您可以根据实际情况修改。
# consumer group id group.id=test-consumer-group
- 按“Esc”,然后输入以下命令,按“Enter”,保存并退出“consumer.properties”文件。
:wq
消费消息示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.xx.xx.45:9093,10.xx.xx.127:9093,10.xx.xx.103:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties Hello DMS Kafka! ^CProcessed a total of 3 messages [root@ecs-kafka bin]#
如需停止消费使用Ctrl+C命令退出。
- 进入Kafka命令行工具的“/config”目录下。