更新时间:2024-04-07 GMT+08:00

步骤四:连接实例生产消费消息

本文主要介绍客户端在命令行模式下使用SASL认证连接Kafka实例的操作。

Kafka实例的每个代理允许客户端单IP连接的个数默认为1000个,如果超过了,会出现连接失败问题。您可以通过修改配置参数来修改单IP的连接数。

前提条件

  • 已配置正确的安全组,安全组规则请参考表1
  • 已获取连接Kafka实例的地址。
    图1 使用内网通过同一个VPC访问Kafka实例的连接地址(实例已开启SASL)
  • 已获取(可选)步骤三:创建Topic中创建的Topic名称。
  • 已购买ECS,并完成JDK安装、环境变量配置以及Kafka开源客户端下载,具体操作请参考准备环境

配置生产消费配置文件

  1. 登录Linux系统的ECS。
  2. 在ECS的“/etc/hosts”文件中配置host和IP的映射关系,以便客户端能够快速解析实例的Broker。

    其中,IP地址必须为实例连接地址(从前提条件获取的连接地址),host为每个实例主机的名称(您可以自定义主机的名称,但不能重复)。

    例如:

    10.154.48.120 server01

    10.154.48.121 server02

    10.154.48.122 server03

  3. 下载client.truststore.jks证书。在Kafka控制台单击Kafka实例名称,进入实例详情页面,在“连接信息 > SSL证书”所在行,单击“下载”。

    解压压缩包,获取压缩包中的客户端证书文件:client.truststore.jks。

  4. 在“consumer.properties”和“producer.properties”文件中分别增加如下行(示例以PLAIN机制为例介绍):

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="**********" \
    password="**********";        
    sasl.mechanism=PLAIN
    
    security.protocol=SASL_SSL
    ssl.truststore.location={ssl_truststore_path}
    ssl.truststore.password=dms@kafka
    ssl.endpoint.identification.algorithm=

    参数说明:

    • username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码。
    • ssl.truststore.location配置为3证书的存放路径。
    • ssl.truststore.password为服务器证书密码,不可更改,需要保持为dms@kafka。
    • ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

生产消息

进入Kafka客户端文件的“/bin”目录下,执行如下命令进行生产消息。

./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties

参数说明如下:

  • 连接地址:从前提条件获取的连接地址。
  • Topic名称:Kafka实例下创建的Topic名称。

示例如下,“192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093”为Kafka实例连接地址。

执行完命令后,输入需要生产的消息内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。

[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093  --topic topic-demo --producer.config ../config/producer.properties
>Hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]# 

如需停止生产使用Ctrl+C命令退出。

消费消息

执行如下命令消费消息。

./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning  --consumer.config ../config/consumer.properties

参数说明如下:

  • 连接地址:从前提条件获取的连接地址。
  • Topic名称:Kafka实例下创建的Topic名称。
  • 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如下划线“_”、#号“#”时,监控数据无法展示。

示例如下:

[root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
Hello
Kafka!
DMS
^CProcessed a total of 3 messages
[root@ecs-kafka bin]# 

如需停止消费使用Ctrl+C命令退出。

后续步骤

您可以通过设置监控指标的告警规格,当实例、节点、队列等有异常时,可以及时接收异常信息。

步骤五:配置必须的监控告警