文档首页/ 分布式消息服务Kafka版/ 最佳实践/ Logstash对接Kafka生产消费消息
更新时间:2024-10-14 GMT+08:00
分享

Logstash对接Kafka生产消费消息

方案概述

应用场景

Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到指定的存储中。Kafka是一种高吞吐量的分布式发布订阅消息系统,也是Logstash支持的众多输入输出源之一。本章节主要介绍Logstash如何对接Kafka实例。

方案架构

  • Kafka实例作为Logstash输入源的示意图如下。
    图1 Kafka实例作为Logstash输入源

    日志采集客户端将数据发送到Kafka实例中,Logstash根据自身性能从Kafka实例中拉取数据。Kafka实例作为Logstash输入源时,可以防止突发流量对于Logstash的影响,以及解耦日志采集客户端和Logstash,保证系统的稳定性。

  • Kafka实例作为Logstash输出源的示意图如下。
    图2 Kafka实例作为Logstash输出源

    Logstash从数据库采集数据,然后发送到Kafka实例中进行存储。Kafka实例作为Logstash输出源时,由于Kafka的高吞吐量,可以存储大量数据。

约束与限制

Logstash从7.5版本开始支持Kafka Integration Plugin插件,Kafka Integration Plugin插件包含Kafka input Plugin和Kafka output Plugin。Kafka input Plugin用于从Kafka实例的Topic中读取数据,Kafka output Plugin把数据写入到Kafka实例的Topic。Logstash、Kafka Integration Plugin与Kafka客户端的版本对应关系如表1所示。请确保Kafka客户端版本大于或等于Kafka实例的版本

表1 版本对应关系

Logstash版本

Kafka Integration Plugin版本

Kafka客户端版本

8.3~8.8

10.12.0

2.8.1

8.0~8.2

10.9.0~10.10.0

2.5.1

7.12~7.17

10.7.4~10.9.0

2.5.1

7.8~7.11

10.2.0~10.7.1

2.4

7.6~7.7

10.0.1

2.3.0

7.5

10.0.0

2.1.0

前提条件

执行实施步骤前,请确保已完成以下操作:

  • 下载Logstash
  • 准备一台Windows系统的主机,在主机中安装Java Development Kit 1.8.111或以上版本和Git Bash。
  • 创建Kafka实例Topic,并获取Kafka实例信息。

    Kafka实例未开启公网访问和SASL认证时,获取表2所示信息。

    表2 Kafka实例信息(未开启公网访问和SASL认证)

    参数名

    获取途径

    内网连接地址

    在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。

    Topic名称

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。

    下文以topic-logstash为例介绍。

    Kafka实例未开启公网访问、已开启SASL认证时,获取表3所示信息。

    表3 Kafka实例信息(未开启公网访问、已开启SASL认证)

    参数名

    获取途径

    内网连接地址

    在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。

    开启的SASL认证机制

    在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。

    启用的安全协议

    在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。

    证书

    在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。

    SASL用户名和密码

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。

    Topic名称

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。

    下文以topic-logstash为例介绍。

    Kafka实例已开启公网访问、未开启SASL认证时,获取表4所示信息。

    表4 Kafka实例信息(已开启公网访问、未开启SASL认证)

    参数名

    获取途径

    公网连接地址

    在Kafka实例详情页的“连接信息”区域,获取“公网连接地址”。

    Topic名称

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。

    下文以topic-logstash为例介绍。

    Kafka实例已开启公网访问和SASL认证时,获取表5所示信息。

    表5 Kafka实例信息(已开启公网访问和SASL认证)

    参数名

    获取途径

    公网连接地址

    在实例详情页的“连接信息”区域,获取“公网连接地址”

    开启的SASL认证机制

    在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。

    启用的安全协议

    在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。

    证书

    在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。

    SASL用户名和密码

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。

    Topic名称

    在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。

    下文以topic-logstash为例介绍。

实施步骤(Kafka实例作为Logstash输出源)

  1. 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“output.conf”配置文件。

    图3 创建“output.conf”配置文件

  2. 在“output.conf”配置文件中,增加如下内容,连接Kafka实例。

    input {
        stdin {}
    }
    output {
     kafka {
    	 bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3"
    	 topic_id => "topic-logstash"
    	
    	 #如果不使用SASL认证,以下参数请注释掉。
    	 #SASL认证机制为“PLAIN”时,配置信息如下。
    	 sasl_mechanism => "PLAIN"
    	 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
    	
    	 #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    	 sasl_mechanism => "SCRAM-SHA-512"
    	 sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"
    		
    	 #安全协议为“SASL_SSL”时,配置信息如下。
    	 security_protocol => "SASL_SSL"
    	 ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks"
    	 ssl_truststore_password => "dms@kafka"
    	 ssl_endpoint_identification_algorithm => ""
    	
    	 #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    	 security_protocol => "SASL_PLAINTEXT"
    	 }
    }

    参数说明如下:

    • bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
    • topic_id:前提条件中获取的Topic名称。
    • sasl_mechanism:SASL认证机制。
    • sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
    • security_protocol:Kafka实例的安全协议。
    • ssl_truststore_location:SSL证书的存放位置。
    • ssl_truststore_password:服务器证书密码,不可更改需要保持为dms@kafka
    • ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

    如果需要了解Kafka output Plugin的其他参数,请参见Kafka output Plugin

  3. 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。

    ./bin/logstash -f ./config/output.conf

    返回“Successfully started Logstash API endpoint”时,表示启动成功。

    图4 启动Logstash

  4. 在Logstash中,生产消息,如下图所示。

    图5 生产消息

  5. 切换到Kafka控制台,单击实例名称,进入实例详情页。
  6. 在左侧导航栏单击“消息查询”,进入消息查询页面。
  7. 在“Topic名称”中选择“topic-logstash”,单击“搜索”,查询消息。

    图6 查询消息

    图6可以看出,Logstash的Kafka output Plugin已经把数据写入到Kafka实例的topic-logstash中。

实施步骤(Kafka实例作为Logstash输入源)

  1. 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“input.conf”配置文件。

    图7 创建“input.conf”配置文件

  2. 在“input.conf”配置文件中,增加如下内容,连接Kafka实例。

    input {
     kafka {
    	 bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3"
    	 group_id => "logstash_group"
    	 topic_id => "topic-logstash"
    	 auto_offset_reset => "earliest"
    	
    	 #如果不使用SASL认证,以下参数请注释掉。
    	 #SASL认证机制为“PLAIN”时,配置信息如下。
    	 sasl_mechanism => "PLAIN"
    	 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
    	
    	 #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。
    	 sasl_mechanism => "SCRAM-SHA-512"
    	 sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"
    		
    	 #安全协议为“SASL_SSL”时,配置信息如下。
    	 security_protocol => "SASL_SSL"
    	 ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks"
    	 ssl_truststore_password => "dms@kafka"
    	 ssl_endpoint_identification_algorithm => ""
    	
    	 #安全协议为“SASL_PLAINTEXT”时,配置信息如下。
    	 security_protocol => "SASL_PLAINTEXT"
    	 }
    }
    output {
     stdout{codec=>rubydebug}
    }

    参数说明如下:

    • bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
    • group_id:消费组的名称。
    • topics:前提条件中获取的Topic名称。
    • auto_offset_reset:指定消费者的消费策略。latest表示偏移量自动被重置到最晚偏移量,earliest表示偏移量自动被重置到最早偏移量,none表示向消费者抛出异常。本文以earliest为例。
    • sasl_mechanism:SASL认证机制。
    • sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
    • security_protocol:Kafka实例的安全协议。
    • ssl_truststore_location:SSL证书的存放位置。
    • ssl_truststore_password:服务器证书密码,不可更改,需要保持为dms@kafka
    • ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空

    如果需要了解Kafka input Plugin的其他参数,请参见Kafka input Plugin

  3. 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。

    ./bin/logstash -f ./config/input.conf

    Logstash启动成功后,Kafka input Plugin会自动从Kafka实例的topic-logstash中读取数据,如下图所示。

    图8 Logstash从topic-logstash中读取的数据

相关文档