- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
实例问题
- 为什么可用区不能选择2个?
- 创建实例时为什么无法查看子网和安全组等信息?
- 如何选择Kafka实例的存储空间?
- Kafka实例的超高IO和高IO如何选择?
- 如何选择Kafka实例存储容量阈值策略?
- Kafka服务端支持版本是多少?
- Kafka实例的ZK地址是什么?
- 创建的Kafka实例是集群模式么?
- Kafka实例是否支持修改访问端口?
- Kafka实例的SSL证书有效期多长?
- 如何将Kafka实例中的数据同步到另一个Kafka实例中?
- Kafka实例的SASL_SSL开关如何修改?
- SASL认证机制如何修改?
- 如何修改安全协议?
- 修改企业项目,是否会导致Kafka重启?
- 100MB/s的带宽怎样开启公网访问?
- Kafka服务和ZK是部署在相同的虚拟机中,还是分开部署?
- Kafka包周期实例支持删除吗?
- Kafka支持哪些加密套件?
- 购买实例时选择的单AZ,怎样可以扩展为多AZ?
- Kafka是否支持跨AZ容灾?已经购买的实例在哪里查看是否为跨AZ?
- Kafka支持磁盘加密吗?
- Kafka实例创建后,能修改VPC和子网吗?
- 有没有Kafka Stream的案例?
- Kafka实例版本可以升级吗?
- 怎样重新绑定公网IP?
- 实例规格变更问题
-
连接问题
- 选择和配置安全组
- Kafka实例是否支持公网访问?
- Kafka实例的连接地址默认有多少个?
- 是否支持跨Region访问?
- Kafka实例是否支持跨VPC访问?
- Kafka实例是否支持不同的子网?
- Kafka是否支持Kerberos认证,如何开启认证?
- Kafka实例是否支持无密码访问?
- 开启公网访问后,在哪查看公网IP地址?
- Kafka支持服务端认证客户端吗?
- 连接开启SASL_SSL的Kafka实例时,ssl truststore文件可以用PEM格式的吗?
- 下载的证书JKS和CRT有什么区别?
- Kafka支持哪个版本的TLS?
- Kafka实例连接数有限制吗?
- 客户端单IP连接的个数为多少?
- Kafka实例的内网连接地址可以修改吗?
- 不同实例中,使用的SSL证书是否一样?
- 为什么不建议使用Sarama客户端收发消息?
- Topic和分区问题
- 消费组问题
- 消息问题
-
Kafka Manager问题
- 登录Kafka Manager的账号是否可以设置为只读账号?
- 登录到Kafka Manager页面,为什么获取不到节点信息?
- Yikes! Insufficient partition balance when creating topic : projectman_project_enterprise_project Try again.
- Kafka Manager能否查询到消息的正文?
- Kafka Manager WebUI的端口能否修改?
- 在Kafka Manager上支持修改Topic的哪些属性?
- Kafka Manager和云监控显示的信息不一致
- Kafka Manager如何修改Topic的分区Leader?
- 实例版本在控制台和Kafka Manager上显示不一致?
- 为什么实例中存在默认名为__trace和__consumer_offsets的Topic?
- 客户端删除消费组后,在Kafka Manager中仍可以看到此消费组?
- 监控告警问题
- Kafka体验版使用说明
-
实例问题
- 故障排除
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
Logstash对接Kafka生产消费消息
方案概述
应用场景
Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到指定的存储中。Kafka是一种高吞吐量的分布式发布订阅消息系统,也是Logstash支持的众多输入输出源之一。本章节主要介绍Logstash如何对接Kafka实例。
方案架构
- Kafka实例作为Logstash输出源的示意图如下。
图1 Kafka实例作为Logstash输出源
Logstash从数据库采集数据,然后发送到Kafka实例中进行存储。Kafka实例作为Logstash输出源时,由于Kafka的高吞吐量,可以存储大量数据。
- Kafka实例作为Logstash输入源的示意图如下。
图2 Kafka实例作为Logstash输入源
日志采集客户端将数据发送到Kafka实例中,Logstash根据自身性能从Kafka实例中拉取数据。Kafka实例作为Logstash输入源时,可以防止突发流量对于Logstash的影响,以及解耦日志采集客户端和Logstash,保证系统的稳定性。
约束与限制
Logstash从7.5版本开始支持Kafka Integration Plugin插件,Kafka Integration Plugin插件包含Kafka input Plugin和Kafka output Plugin。Kafka input Plugin用于从Kafka实例的Topic中读取数据,Kafka output Plugin把数据写入到Kafka实例的Topic。Logstash、Kafka Integration Plugin与Kafka客户端的版本对应关系如表1所示。请确保Kafka客户端版本大于或等于Kafka实例的版本。
前提条件
执行实施步骤前,请确保已完成以下操作:
- 下载Logstash。
- 准备一台Windows系统的主机,在主机中安装Java Development Kit 1.8.111或以上版本和Git Bash。
- 创建Kafka实例和Topic,并获取Kafka实例信息。
Kafka实例未开启公网访问和SASL认证时,获取表2所示信息。
表2 Kafka实例信息(未开启公网访问和SASL认证) 参数名
获取途径
内网连接地址
在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例未开启公网访问、已开启SASL认证时,获取表3所示信息。
表3 Kafka实例信息(未开启公网访问、已开启SASL认证) 参数名
获取途径
内网连接地址
在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。
开启的SASL认证机制
在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。
启用的安全协议
在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。
证书
在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。
SASL用户名和密码
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例已开启公网访问、未开启SASL认证时,获取表4所示信息。
表4 Kafka实例信息(已开启公网访问、未开启SASL认证) 参数名
获取途径
公网连接地址
在Kafka实例详情页的“连接信息”区域,获取“公网连接地址”。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
Kafka实例已开启公网访问和SASL认证时,获取表5所示信息。
表5 Kafka实例信息(已开启公网访问和SASL认证) 参数名
获取途径
公网连接地址
在实例详情页的“连接信息”区域,获取“公网连接地址”
开启的SASL认证机制
在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。
启用的安全协议
在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。
证书
在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。
SASL用户名和密码
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。
Topic名称
在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。
下文以topic-logstash为例介绍。
实施步骤(Kafka实例作为Logstash输出源)
- 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“output.conf”配置文件。
图3 创建“output.conf”配置文件
- 在“output.conf”配置文件中,增加如下内容,连接Kafka实例。
input { stdin {} } output { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" topic_id => "topic-logstash" #如果不使用SASL认证,以下参数请注释掉。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" #安全协议为“SASL_SSL”时,配置信息如下。 security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security_protocol => "SASL_PLAINTEXT" } }
参数说明如下:
- bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
- topic_id:前提条件中获取的Topic名称。
- sasl_mechanism:SASL认证机制。
- sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
- security_protocol:Kafka实例的安全协议。
- ssl_truststore_location:SSL证书的存放位置。
- ssl_truststore_password:服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
如果需要了解Kafka output Plugin的其他参数,请参见Kafka output Plugin。
- 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。
./bin/logstash -f ./config/output.conf
返回“Successfully started Logstash API endpoint”时,表示启动成功。
图4 启动Logstash - 在Logstash中,生产消息,如下图所示。
图5 生产消息
- 切换到Kafka控制台,单击实例名称,进入实例详情页。
- 在左侧导航栏单击“消息查询”,进入消息查询页面。
- 在“Topic名称”中选择“topic-logstash”,单击“搜索”,查询消息。
从图6可以看出,Logstash的Kafka output Plugin已经把数据写入到Kafka实例的topic-logstash中。
实施步骤(Kafka实例作为Logstash输入源)
- 在Windows主机中,解压Logstash压缩包,进入“config”文件夹,创建“input.conf”配置文件。
图7 创建“input.conf”配置文件
- 在“input.conf”配置文件中,增加如下内容,连接Kafka实例。
input { kafka { bootstrap_servers => "ip1:port1,ip2:port2,ip3:port3" group_id => "logstash_group" topic_id => "topic-logstash" auto_offset_reset => "earliest" #如果不使用SASL认证,以下参数请注释掉。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';" #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl_mechanism => "SCRAM-SHA-512" sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';" #安全协议为“SASL_SSL”时,配置信息如下。 security_protocol => "SASL_SSL" ssl_truststore_location => "C:\\Users\\Desktop\\logstash-8.8.1\\config\\client.jks" ssl_truststore_password => "dms@kafka" ssl_endpoint_identification_algorithm => "" #安全协议为“SASL_PLAINTEXT”时,配置信息如下。 security_protocol => "SASL_PLAINTEXT" } } output { stdout{codec=>rubydebug} }
参数说明如下:
- bootstrap_servers:前提条件中获取的Kafka实例“内网连接地址”/“公网连接地址”。
- group_id:消费组的名称。
- topic_id:前提条件中获取的Topic名称。
- auto_offset_reset:指定消费者的消费策略。latest表示偏移量自动被重置到最晚偏移量,earliest表示偏移量自动被重置到最早偏移量,none表示向消费者抛出异常。本文以earliest为例。
- sasl_mechanism:SASL认证机制。
- sasl_jaas_config:SASL jaas的配置文件,根据实际情况修改为前提条件中获取的SASL用户名和密码。
- security_protocol:Kafka实例的安全协议。
- ssl_truststore_location:SSL证书的存放位置。
- ssl_truststore_password:服务器证书密码,不可更改,需要保持为dms@kafka。
- ssl_endpoint_identification_algorithm:证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
如果需要了解Kafka input Plugin的其他参数,请参见Kafka input Plugin。
- 在Logstash文件夹根目录打开Git Bash,执行以下命令启动Logstash。
./bin/logstash -f ./config/input.conf
Logstash启动成功后,Kafka input Plugin会自动从Kafka实例的topic-logstash中读取数据,如下图所示。
图8 Logstash从topic-logstash中读取的数据