使用Logstash将Kafka数据接入Elasticsearch
本文介绍如何使用云搜索服务CSS的Logstash将Kafka中的数据接入到CSS的Elasticsearch中。Logstash作为ELK(Elasticsearch、Logstash、Kibana)日志方案中的关键组件,支持多种数据源与ETL(Extract、Transform、Load)处理方式,能够有效提升日志管理和服务效率。
应用场景
使用CSS Logstash将Kafka数据接入CSS Elasticsearch适用于多种业务场景。
- 实时日志管理:需要实时采集、处理和存储日志数据。
- 数据预处理:对原始日志数据进行清洗、转换和丰富。
- 统一日志管理:搭建统一的日志管理平台,实现日志的统一存储、分析和可视化。
方案架构

- Kafka作为消息中间件,存储待处理的日志数据。
- Logstash消费Kafka中的数据,进行ETL处理后写入Elasticsearch。
- Elasticsearch存储处理后的日志数据,支持高效查询和分析。
- Kibana提供数据可视化界面,便于用户进行数据分析和报表制作。
方案优势
- 高效处理:Logstash支持高吞吐量和低延迟的数据处理。
- 灵活配置:支持多种数据源和目标,配置灵活。
- 可扩展性:支持水平扩展,适用于大规模数据处理场景。
- 实时性:能够实时消费Kafka数据并写入Elasticsearch。
约束限制
Logstash与Kafka、Elasticsearch之间需要保证网络连通。
前提条件
- 在CSS控制台已创建Elasticsearch集群,用于存储、分析数据。集群状态为“运行中”。获取并记录了集群的访问地址,如果是安全集群,还要记录访问集群的账号和密码。
如需新建集群请参见创建Elasticsearch集群。
- 在CSS控制台已创建Logstash集群,用于数据处理。
如需新建集群请参见创建Logstash集群。
- 数据已成功接入Kafka的指定Topic中。获取并记录了Kafka的IP地址和端口号。
如果使用的是分布式消息服务Kafka版则可以参考创建Kafka Topic。
步骤一:在Elasticsearch上创建索引模板
索引模板用于定义Elasticsearch中索引的结构和设置,包括分片数、副本数、字段类型等,确保数据在Elasticsearch中的存储结构一致。
- 登录目标集群的Kibana。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Elasticsearch”。
- 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
- 在Kibana左侧导航栏选择“Dev Tools”,进入操作页面。
- 执行命令创建索引模板。
例如,创建一个索引模板,配置索引默认采用3分片、0副本,索引中定义了@timestamp字段。
PUT _template/filebeat { "index_patterns": ["*topic*"], "settings": { # 定义分片数。 "number_of_shards": 3, # 定义副本数。 "number_of_replicas": 0, "refresh_interval": "5s" }, # 定义字段。 "mappings": { "properties": { "@timestamp": { "type": "date" } } } }
步骤二:测试Logstash与Kafka的网络连通性
测试Logstash与Kafka之间的网络连通性,确保数据能够从Kafka正确传输到Logstash,进而写入Elasticsearch。
- 进入配置中心页面。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Logstash”。
- 在集群列表,单击目标集群名称,进入集群详情页。
- 选择“配置中心”页签。
- 在配置中心页面,单击“连通性测试”。
- 在连通性测试弹窗中,输入kafka的IP地址和端口号,单击“测试”。
图2 连通性测试
当显示“可用”时,表示集群间网络连通。
步骤三:创建Logstash配置文件
通过创建Logstash配置文件,定义数据的输入、处理和输出规则,确保数据能够从Kafka正确消费、处理后写入Elasticsearch。
- 在Logstash配置中心页面,单击右上角“创建”,编辑配置文件。
- 选择集群模板:展开系统模板,选择“kafka”,单击操作列的“应用”。
- 设置配置文件名称:在“名称”处自定义配置文件名称,例如“kafka-es”。
- 修改配置文件内容:在“配置文件内容”处根据注释修改配置方案,以下仅为配置示例,需要基于业务情况修改。
- 对于多个topic配置到一个pipeline中建议设置多个input解决,不建议配置到一个input中。
- 对于不同的kafka topic需要配置通过的group_id、client_id、type确保消费组隔离。
- consumer_threads消费线程参数配置建议基于topic中partition配置,要求Logstash节点数乘以consumer_threads大于等于partition。
- 由于时区不同,Logstash机器是UTC时间,如果需要在0点实现索引轮转,需要增加ruby脚本+8小时。
input { kafka { # input以json格式读取数据 codec => "json" # topic名称 topics => ["topic-nginx"] # kafka的访问地址 bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092" # kafka心跳检查参数,保持默认 max_poll_interval_ms => "3000000" session_timeout_ms => "90000" heartbeat_interval_ms => "30000" # logstash节点consumer消费线程,基于topic中patition配置 consumer_threads => 5 max_poll_records => "3000" auto_offset_reset => "latest" # topic消费组和类型配置 group_id => "topic-nginx-elk-hw" client_id => "topic-nginx-elk-hw" type => "topic-nginx" } } input { kafka { # input以文本格式读取数据 codec => "plain" # topic名称 topics => ["topic-gateway"] # kafka连接地址 bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092" # kafka心跳检查参数,保持默认 max_poll_interval_ms => "3000000" session_timeout_ms => "90000" heartbeat_interval_ms => "30000" # logstash节点consumer消费线程,基于topic中partition配置 consumer_threads => 5 max_poll_records => "3000" auto_offset_reset => "latest" # topic消费组和类型配置 group_id => "topic-gateway-elk-hw" client_id => "topic-gateway-elk-hw" type => "topic-gateway" } } # 对数据的切割与截取信息 filter { mutate { remove_field => ["@version","tags","source","input","prospector","beat"] } # 修正索引的的创建时间,使用索引按照北京时间滚动。 ruby { code => " event.set('[@metadata][localdate]', (event.get('@timestamp').time.localtime + 8 * 60 * 60).strftime('%Y.%m.%d'))" } } # CSS集群的信息 output { elasticsearch { hosts => ["http://192.168.0.4:9200"] index => "%{type}-%{[@metadata][localdate]}" #user => "xxx" #password => "xxx" } }
- “隐藏内容列表”:输入需要隐藏的敏感字串列表,按Enter创建。配置隐藏字符串列表后,在返回的配置内容中,会将所有在列表中的字串隐藏为***(列表最大支持20条,单个字串最大长度512字节)。
- 配置文件编辑完成后,单击“下一页”配置Logstash配置文件的运行参数。
- “pipeline.worker”建议和Logtash节点的CPU核数保持一致。
- 当同一个pipeline中topic个数较少时,建议“pipeline.batch.size”配置大于1000;当同一个pipeline中有多个topic,并且topic数据量不同,建议“pipeline.batch.size”配置小于1000,避免一个topic阻塞整个pipeline。
- “pipeline.batch.delay”可以保持默认值。
- “queue type”选择“memory”。
- 配置完成后,单击“创建”。
步骤四:启动Logstash的配置文件
启动Logstash的配置文件,Logstash连接到Kafka,从Kafka的主题中拉取数据,处理后写入Elasticsearch。
- 在Logstash配置中心页面,勾选新建的配置文件,单击“启动”。
- 在“启动Logstash服务”对话框中,勾选“是否保持常驻”,开启保持常驻,确保Logstash配置文件持续运行,不会因为服务重启、Logstash消费异常或其他原因导致数据处理中断。
保持常驻会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复,从而保证日志管理系统的高效和稳定运行。
- 单击“确定”,启动配置文件。
在管道列表中可以查看启动的配置文件状态,监控数据迁移情况,确保数据正常消费和写入。
步骤五:在Elasticsearch中创建索引模式
索引模式用于定义如何在Kibana中查询和分析接入Elasticsearch的数据。本文在创建索引模式时指定了时间字段为@timestamp,确保了在Kibana中能够按时间范围查询和分析数据。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Elasticsearch”。
- 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
- 在Kibana左侧导航栏选择“Stack Management”。
- 选择“Index patterns”,单击“Create index pattern”。
- 在创建索引模式页面,配置“Index pattern name”,例如“topic-gateway*”。单击“Next step”配置时间字段,选择步骤一:在Elasticsearch上创建索引模板创建的索引模板“@timestamp”。
图3 index pattern配置界面示例
- 单击“Create index pattern”完成创建,返回索引模式列表可以看到新建的索引模式。
步骤六:查询和分析数据
使用Kibana的可视化功能将接入Elasticsearch的数据制作成报表,便于数据分析和展示。