Logstash对接Kafka生产消费消息
方案概述
应用场景
Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到指定的存储中。Kafka是一种高吞吐量的分布式发布订阅消息系统,也是Logstash支持的众多输入输出源之一。本章节主要介绍Logstash如何对接Kafka实例。
方案架构
- Kafka实例作为Logstash输入源的示意图如下。
图1 Kafka实例作为Logstash输入源
日志采集客户端将数据发送到Kafka实例中,Logstash根据自身性能从Kafka实例中拉取数据。Kafka实例作为Logstash输入源时,可以防止突发流量对于Logstash的影响,以及解耦日志采集客户端和Logstash,保证系统的稳定性。
- Kafka实例作为Logstash输出源的示意图如下。
图2 Kafka实例作为Logstash输出源
Logstash从数据库采集数据,然后发送到Kafka实例中进行存储。Kafka实例作为Logstash输出源时,由于Kafka的高吞吐量,可以存储大量数据。
约束与限制
Logstash从7.5版本开始支持Kafka Integration Plugin插件,Kafka Integration Plugin插件包含Kafka input Plugin和Kafka output Plugin。Kafka input Plugin用于从Kafka实例的Topic中读取数据,Kafka output Plugin把数据写入到Kafka实例的Topic。Logstash、Kafka Integration Plugin与Kafka客户端的版本对应关系如表1所示。请确保Kafka客户端版本大于或等于Kafka实例的版本。
前提条件
执行实施步骤前,请确保已完成以下操作:
- 下载Logstash。
- 准备一台Windows系统的主机,在主机中安装Java Development Kit 1.8.111或以上版本和Git Bash。
- 创建Kafka实例和Topic,并获取Kafka实例信息。
Kafka实例未开启公网访问和SASL认证时,获取表2所示信息。
表2 Kafka实例信息(未开启公网访问和SASL认证) 参数名
获取途径
内网连接地址
在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例未开启公网访问、已开启SASL认证时,获取表3所示信息。
表3 Kafka实例信息(未开启公网访问、已开启SASL认证) 参数名
获取途径
内网连接地址
在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。
开启的SASL认证机制
在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。
启用的安全协议
在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。
证书
在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。
SASL用户名和密码
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例已开启公网访问、未开启SASL认证时,获取表4所示信息。
表4 Kafka实例信息(已开启公网访问、未开启SASL认证) 参数名
获取途径
公网连接地址
在Kafka实例详情页的“连接信息”区域,获取“公网连接地址”。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例已开启公网访问和SASL认证时,获取表5所示信息。
表5 Kafka实例信息(已开启公网访问和SASL认证) 参数名
获取途径
公网连接地址
在实例详情页的“连接信息”区域,获取“公网连接地址”
开启的SASL认证机制
在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。
启用的安全协议
在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。
证书
在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。
SASL用户名和密码
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
实施步骤(Kafka实例作为Logstash输出源)
- 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“output.conf”配置文件。
图3 创建“output.conf”配置文件
- 在“output.conf”配置文件中,增加如下内容,连接Kafka实例。
input { stdin {} } output { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" topic_id => "topic-logstash" #如果不使用SASL认证,以下参数请注释掉。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" #安全协议为“SASL_SSL”时,配置信息如下。 security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security_protocol => "SASL_PLAINTEXT" } }
参数说明如下:
- bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
- topic_id:前提条件中获取的Topic名称。
- sasl_mechanism:SASL认证机制。
- sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
- security_protocol:Kafka实例的安全协议。
- ssl_truststore_location:SSL证书的存放位置。
- ssl_truststore_password:服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
如果需要了解Kafka output Plugin的其他参数,请参见Kafka output Plugin。
- 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。
./bin/logstash -f ./config/output.conf
返回“Successfully started Logstash API endpoint”时,表示启动成功。
图4 启动Logstash
- 在Logstash中,生产消息,如下图所示。
图5 生产消息
- 切换到Kafka控制台,单击实例名称,进入实例详情页。
- 在左侧导航栏单击“消息查询”,进入消息查询页面。
- 在“Topic名称”中选择“topic-logstash”,单击“搜索”,查询消息。
从图6可以看出,Logstash的Kafka output Plugin已经把数据写入到Kafka实例的topic-logstash中。
实施步骤(Kafka实例作为Logstash输入源)
- 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“input.conf”配置文件。
图7 创建“input.conf”配置文件
- 在“input.conf”配置文件中,增加如下内容,连接Kafka实例。
input { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" group_id => "logstash_group" topic_id => "topic-logstash" auto_offset_reset => "earliest" #如果不使用SASL认证,以下参数请注释掉。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" #安全协议为“SASL_SSL”时,配置信息如下。 security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security_protocol => "SASL_PLAINTEXT" } } output { stdout{codec=>rubydebug} }
参数说明如下:
- bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
- group_id:消费组的名称。
- topics:前提条件中获取的Topic名称。
- auto_offset_reset:指定消费者的消费策略。latest表示偏移量自动被重置到最晚偏移量,earliest表示偏移量自动被重置到最早偏移量,none表示向消费者抛出异常。本文以earliest为例。
- sasl_mechanism:SASL认证机制。
- sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
- security_protocol:Kafka实例的安全协议。
- ssl_truststore_location:SSL证书的存放位置。
- ssl_truststore_password:服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
如果需要了解Kafka input Plugin的其他参数,请参见Kafka input Plugin。
- 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。
./bin/logstash -f ./config/input.conf
Logstash启动成功后,Kafka input Plugin会自动从Kafka实例的topic-logstash中读取数据,如下图所示。
图8 Logstash从topic-logstash中读取的数据