文档首页/ 云搜索服务 CSS/ 最佳实践/ 使用Logstash同步数据至Elasticsearch/ 使用Logstash将Kafka数据接入Elasticsearch
更新时间:2025-09-05 GMT+08:00

使用Logstash将Kafka数据接入Elasticsearch

本文介绍如何使用云搜索服务CSS的Logstash将Kafka中的数据接入到CSS的Elasticsearch中。Logstash作为ELK(Elasticsearch、Logstash、Kibana)日志方案中的关键组件,支持多种数据源与ETL(Extract、Transform、Load)处理方式,能够有效提升日志管理和服务效率。

应用场景

使用CSS Logstash将Kafka数据接入CSS Elasticsearch适用于多种业务场景。

  • 实时日志管理:需要实时采集、处理和存储日志数据。
  • 数据预处理:对原始日志数据进行清洗、转换和丰富。
  • 统一日志管理:搭建统一的日志管理平台,实现日志的统一存储、分析和可视化。

方案架构

图1 架构图
  1. Kafka作为消息中间件,存储待处理的日志数据。
  2. Logstash消费Kafka中的数据,进行ETL处理后写入Elasticsearch。
  3. Elasticsearch存储处理后的日志数据,支持高效查询和分析。
  4. Kibana提供数据可视化界面,便于用户进行数据分析和报表制作。

方案优势

  • 高效处理:Logstash支持高吞吐量和低延迟的数据处理。
  • 灵活配置:支持多种数据源和目标,配置灵活。
  • 可扩展性:支持水平扩展,适用于大规模数据处理场景。
  • 实时性:能够实时消费Kafka数据并写入Elasticsearch。

约束限制

Logstash与Kafka、Elasticsearch之间需要保证网络连通。

前提条件

  • 在CSS控制台已创建Elasticsearch集群,用于存储、分析数据。集群状态为“运行中”。获取并记录了集群的访问地址,如果是安全集群,还要记录访问集群的账号和密码。

    如需新建集群请参见创建Elasticsearch集群

  • 在CSS控制台已创建Logstash集群,用于数据处理。

    如需新建集群请参见创建Logstash集群

  • 数据已成功接入Kafka的指定Topic中。获取并记录了Kafka的IP地址和端口号。

    如果使用的是分布式消息服务Kafka版则可以参考创建Kafka Topic

步骤一:在Elasticsearch上创建索引模板

索引模板用于定义Elasticsearch中索引的结构和设置,包括分片数、副本数、字段类型等,确保数据在Elasticsearch中的存储结构一致。

  1. 登录目标集群的Kibana。
    1. 登录云搜索服务管理控制台
    2. 在左侧导航栏,选择“集群管理 > Elasticsearch”
    3. 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
    4. 在Kibana左侧导航栏选择“Dev Tools”,进入操作页面。
  2. 执行命令创建索引模板。

    例如,创建一个索引模板,配置索引默认采用3分片、0副本,索引中定义了@timestamp字段。

    PUT _template/filebeat
    {
      "index_patterns": ["*topic*"],
      "settings": {
        # 定义分片数。
        "number_of_shards": 3,
        # 定义副本数。
        "number_of_replicas": 0,
        "refresh_interval": "5s"
      },
      # 定义字段。
      "mappings": {
            "properties": {
              "@timestamp": {
                "type": "date"
              }
            }
        }
    }

步骤二:测试Logstash与Kafka的网络连通性

测试Logstash与Kafka之间的网络连通性,确保数据能够从Kafka正确传输到Logstash,进而写入Elasticsearch。

  1. 进入配置中心页面。
    1. 登录云搜索服务管理控制台
    2. 在左侧导航栏,选择“集群管理 > Logstash”
    3. 在集群列表,单击目标集群名称,进入集群详情页。
    4. 选择“配置中心”页签。
  2. 在配置中心页面,单击“连通性测试”
  3. 在连通性测试弹窗中,输入kafka的IP地址和端口号,单击“测试”
    图2 连通性测试

    当显示“可用”时,表示集群间网络连通。

步骤三:创建Logstash配置文件

通过创建Logstash配置文件,定义数据的输入、处理和输出规则,确保数据能够从Kafka正确消费、处理后写入Elasticsearch。

  1. 在Logstash配置中心页面,单击右上角“创建”,编辑配置文件。
    1. 选择集群模板:展开系统模板,选择“kafka”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“kafka-es”
    3. 修改配置文件内容:在“配置文件内容”处根据注释修改配置方案,以下仅为配置示例,需要基于业务情况修改。
      • 对于多个topic配置到一个pipeline中建议设置多个input解决,不建议配置到一个input中。
      • 对于不同的kafka topic需要配置通过的group_id、client_id、type确保消费组隔离。
      • consumer_threads消费线程参数配置建议基于topic中partition配置,要求Logstash节点数乘以consumer_threads大于等于partition。
      • 由于时区不同,Logstash机器是UTC时间,如果需要在0点实现索引轮转,需要增加ruby脚本+8小时。
      input {
         kafka  {
             # input以json格式读取数据
             codec => "json"
             # topic名称
             topics => ["topic-nginx"]
             # kafka的访问地址
             bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092"
             # kafka心跳检查参数,保持默认
             max_poll_interval_ms => "3000000"
             session_timeout_ms => "90000"
             heartbeat_interval_ms => "30000"
             # logstash节点consumer消费线程,基于topic中patition配置
             consumer_threads => 5
             max_poll_records => "3000"
             auto_offset_reset => "latest"
             # topic消费组和类型配置
             group_id => "topic-nginx-elk-hw"
             client_id => "topic-nginx-elk-hw"
             type => "topic-nginx"
           }
      }
      input {
         kafka  {
             # input以文本格式读取数据
             codec => "plain"
             # topic名称
             topics => ["topic-gateway"]
             # kafka连接地址
             bootstrap_servers => "192.168.0.1:9092,192.168.0.2:9092"
             # kafka心跳检查参数,保持默认
             max_poll_interval_ms => "3000000"
             session_timeout_ms => "90000"
             heartbeat_interval_ms => "30000"
             # logstash节点consumer消费线程,基于topic中partition配置
             consumer_threads => 5
             max_poll_records => "3000"
             auto_offset_reset => "latest"
             # topic消费组和类型配置
             group_id => "topic-gateway-elk-hw"
             client_id => "topic-gateway-elk-hw"
             type => "topic-gateway"
           }
      }
      
      # 对数据的切割与截取信息
      filter {
          mutate {
            remove_field => ["@version","tags","source","input","prospector","beat"]
          }
        # 修正索引的的创建时间,使用索引按照北京时间滚动。
        ruby {
          code => " event.set('[@metadata][localdate]', (event.get('@timestamp').time.localtime + 8 * 60 * 60).strftime('%Y.%m.%d'))"
        }
      }
      
      # CSS集群的信息
      output {
        elasticsearch {
          hosts => ["http://192.168.0.4:9200"]
          index  => "%{type}-%{[@metadata][localdate]}"
          #user => "xxx"
          #password => "xxx"
        }
      }
    4. “隐藏内容列表”:输入需要隐藏的敏感字串列表,按Enter创建。配置隐藏字符串列表后,在返回的配置内容中,会将所有在列表中的字串隐藏为***(列表最大支持20条,单个字串最大长度512字节)。
  2. 配置文件编辑完成后,单击“下一页”配置Logstash配置文件的运行参数。
    • “pipeline.worker”建议和Logtash节点的CPU核数保持一致。
    • 当同一个pipeline中topic个数较少时,建议“pipeline.batch.size”配置大于1000;当同一个pipeline中有多个topic,并且topic数据量不同,建议“pipeline.batch.size”配置小于1000,避免一个topic阻塞整个pipeline。
    • “pipeline.batch.delay”可以保持默认值。
    • “queue type”选择“memory”
  3. 配置完成后,单击“创建”。

    在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

步骤四:启动Logstash的配置文件

启动Logstash的配置文件,Logstash连接到Kafka,从Kafka的主题中拉取数据,处理后写入Elasticsearch。

  1. 在Logstash配置中心页面,勾选新建的配置文件,单击“启动”
  2. “启动Logstash服务”对话框中,勾选“是否保持常驻”,开启保持常驻,确保Logstash配置文件持续运行,不会因为服务重启、Logstash消费异常或其他原因导致数据处理中断。

    保持常驻会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复,从而保证日志管理系统的高效和稳定运行。

  3. 单击“确定”,启动配置文件。

    在管道列表中可以查看启动的配置文件状态,监控数据迁移情况,确保数据正常消费和写入。

步骤五:在Elasticsearch中创建索引模式

索引模式用于定义如何在Kibana中查询和分析接入Elasticsearch的数据。本文在创建索引模式时指定了时间字段为@timestamp,确保了在Kibana中能够按时间范围查询和分析数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧导航栏,选择“集群管理 > Elasticsearch”
  3. 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
  4. 在Kibana左侧导航栏选择“Stack Management”
  5. 选择“Index patterns”,单击“Create index pattern”
  6. 在创建索引模式页面,配置“Index pattern name”,例如“topic-gateway*”。单击“Next step”配置时间字段,选择步骤一:在Elasticsearch上创建索引模板创建的索引模板“@timestamp”
    图3 index pattern配置界面示例
  7. 单击“Create index pattern”完成创建,返回索引模式列表可以看到新建的索引模式。

步骤六:查询和分析数据

使用Kibana的可视化功能将接入Elasticsearch的数据制作成报表,便于数据分析和展示。

在Kibana左侧导航栏选择“Discover”进行数据查询与分析。数据展示效果如图4所示。
图4 Discover界面示例