文档首页/ 云搜索服务 CSS/ 最佳实践/ 使用Elasticsearch、自建Logstash和Kibana构建日志管理平台
更新时间:2024-09-14 GMT+08:00

使用Elasticsearch、自建Logstash和Kibana构建日志管理平台

使用CSS服务的Elasticsearch集群搭建的统一日志管理平台可以实时地、统一地、方便地管理日志,让日志驱动运维、运营等,提升服务管理效率。

应用场景

本文以Elasticsearch、Filebeat、Logstash和Kibana为例,搭建一个统一日志管理平台。使用Filebeat采集ECS中的日志,发送到Logstash进行数据处理,再存储到Elasticsearch中,最后通过Kibana进行日志的可视化查询与分析。该方案可以用于以下场景:
  • 日志管理:集中管理应用程序和系统日志,快速定位问题。
  • 安全监控:检测和响应安全威胁,进行入侵检测和异常行为分析。
  • 业务分析:分析用户行为,优化产品和服务。
  • 性能监控:监控系统和应用程序性能,实时发现瓶颈。

方案架构

ELKB(Elasticsearch、Logstash、Kibana、Beats)提供了一整套日志场景解决方案,是目前主流的一种日志系统。
  • Elasticsearch是一个开源分布式的搜索和分析引擎,用于存储、搜索和分析大量数据。
  • Logstash是一个服务器端的数据管道,负责收集、解析和丰富数据后,将其发送到Elasticsearch。
  • Kibana为Elasticsearch提供一个开源的数据分析和可视化平台,用于对Elasticsearch中的数据进行搜索、查看和交互。
  • Beats:轻量级的数据收集器(如Filebeat、Metricbeat等),安装在服务器上,负责收集和转发数据到Logstash。

使用Elasticsearch和Logstash构建日志管理平台的方案架构如图1所示。

图1 ELKB架构
  1. 数据收集
    • Beats作为数据收集器,负责从各种源收集数据并发送到Logstash。
    • Logstash可以独立收集数据,或从Beats接收数据,对数据进行过滤、转换和增强。
  2. 数据处理

    Logstash在将数据发送到Elasticsearch之前,对数据进行必要的处理,如解析结构化日志、过滤无用信息等。

  3. 数据存储

    Elasticsearch作为核心存储组件,Elasticsearch索引和存储来自Logstash的数据,提供快速搜索和数据检索功能。

  4. 数据分析与可视化

    使用Kibana对Elasticsearch中的数据进行分析和可视化,创建仪表板和报告,以直观展示数据。

ELKB系统中各组件的版本兼容性请参见https://www.elastic.co/support/matrix#matrix_compatibility

方案优势

  • 实时性:提供实时数据收集和分析能力。
  • 灵活性:支持各种数据源和灵活的数据处理流程。
  • 易用性:用户界面友好,简化了数据操作和可视化过程。
  • 扩展性:水平扩展能力强,可以处理PB级别的数据。

前提条件

操作步骤

  1. 登录ECS,部署并配置Filebeat。

    1. 下载Filebeat,版本建议选择7.6.2。下载地址:https://www.elastic.co/downloads/past-releases#filebeat-oss
    2. 配置Filebeat的配置文件“filebeat.yml”
      例如,采集“/root/”目录下以“log”结尾的所有文件,配置文件“filebeat.yml”中的内容如下:
      filebeat.inputs:
      - type: log
        enabled: true
        # 采集的日志文件路径。
        paths:
          - /root/*.log
      
      filebeat.config.modules:
        path: ${path.config}/modules.d/*.yml
        reload.enabled: false
      # Logstash的hosts信息
      output.logstash:
        hosts: ["192.168.0.126:5044"]
      
      processors:

  2. 部署并配置自建Logstash。

    为了获得更优的性能,自建Logstash中的JVM参数建议配置为“ECS/docker”内存的一半。

    1. 下载Logstash,版本建议选择7.6.2。下载地址:https://www.elastic.co/downloads/past-releases#logstash-oss
    2. 确保Logstash与CSS集群的网络互通,在虚拟机上执行命令curl http:// {ip}:{port}测试虚拟机和ELasticsearch集群的连通性,返回200,表示已经连通。
    3. 配置Logstash的配置文件“logstash-sample.conf”

      配置文件“logstash-sample.conf”中的内容如下:

      input {
        beats {
          port => 5044
        }
      }
      # 对数据的切割与截取信息,
      filter {
          grok {
              match => {
                  "message" => '\[%{GREEDYDATA:timemaybe}\] \[%{WORD:level}\] %{GREEDYDATA:content}'
              }
          }
          mutate {
            remove_field => ["@version","tags","source","input","prospector","beat"]
          }
      }
      # CSS集群的信息
      output {
        elasticsearch {
          hosts => ["http://192.168.0.4:9200"]
          index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
          #user => "xxx"
          #password => "xxx"
        }
      }

      Logstash的“filter”进行模式配置时,可以借助Grok Debugger(https://grokdebugger.com/)。

  3. 配置Elasticsearch集群的索引模板。

    1. 登录云搜索服务管理控制台。
    2. 在左侧导航栏中,选择“集群管理 > Elasticsearch”
    3. 在集群列表,选择待操作的集群,单击操作列“Kibana”,登录Kibana。
    4. 单击左侧导航栏的“Dev Tools”进入操作页面。
    5. 创建一个索引模板令。

      例如,创建一个索引模板,配置索引默认采用3分片、0副本,索引中定义了“@timestamp”“content”“host.name”“level”“log.file.path”“message”“timemaybe”等字段。

      PUT _template/filebeat
      {
        "index_patterns": ["filebeat*"],
        "settings": {
          # 定义分片数。
          "number_of_shards": 3,
          # 定义副本数。
          "number_of_replicas": 0,
          "refresh_interval": "5s"
        },
        # 定义字段。
        "mappings": {
              "properties": {
                "@timestamp": {
                  "type": "date"
                },
                "content": {
                  "type": "text"
                },
                "host": {
                  "properties": {
                    "name": {
                      "type": "text"
                    }
                  }
                },
                "level": {
                  "type": "keyword"
                },
                "log": {
                  "properties": {
                    "file": {
                      "properties": {
                        "path": {
                          "type": "text"
                        }
                      }
                    }
                  }
                },
                "message": {
                  "type": "text"
                },
                "timemaybe": {
                  "type": "date",
                  "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis||EEE MMM dd HH:mm:ss zzz yyyy"
                }
              }
          }
      }

  4. 在ECS上准备测试数据。

    执行如下命令,生成测试数据,并将数据写到“/root/tmp.log”中:

    bash -c 'while true; do echo [$(date)] [info] this is the test message; sleep 1; done;' >> /root/tmp.log &

    生成的测试数据样例如下:

    [Thu Feb 13 14:01:16 CST 2020] [info] this is the test message

  5. 执行如下命令,启动Logstash。

    nohup ./bin/logstash -f /opt/pht/logstash-6.8.6/logstash-sample.conf &

  6. 执行如下命令,启动Filebeat。

    ./filebeat

  7. 通过Kibana进行查询并制作报表。

    1. 进入Elasticsearch集群的Kibana操作界面。
    2. 单击左侧导航栏的“Discover”进行查询与分析,类似的效果如图2所示。
      图2 Discover界面示例