使用Logstash导入数据到Elasticsearch
云搜索服务支持使用Logstash将其收集的数据迁移到Elasticsearch中,方便用户通过Elasticsearch搜索引擎高效管理和获取数据。数据文件支持JSON、CSV等格式。
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据、转换数据,然后将数据发送到Elasticsearch中。Logstash的官方文档请参见:https://www.elastic.co/guide/en/logstash/current/getting-started-with-logstash.html。
数据导入分为如下2种场景:
前提条件
- 为方便操作,建议采用Linux操作系统的机器部署Logstash。
- Logstash的下载路径为:https://www.elastic.co/cn/downloads/logstash-oss
Logstash要求使用OSS版本,选择和CSS一致版本。
- 安装完Logstash后,再根据如下步骤导入数据。安装Logstash的操作指导,请参见:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
- 安装Logstash之前,需要先安装JDK。在Linux操作系统中,您可以执行yum -y install java-1.8.0命令直接安装1.8.0版本JDK。在Windows操作系统中,您可以访问JDK官网,下载符合操作系统版本的JDK,并根据指导安装。
- 在“Logstash部署在弹性云服务器上时导入数据”场景中,请确保此弹性云服务器与接入的Elasticsearch集群在同一个VPC下。
Logstash部署在外网时导入数据
当Logstash部署在外网时,导入数据的流程说明如图1所示。
- 创建一个跳转主机,并按如下要求进行配置。
- 跳转主机为一台Linux操作系统的弹性云服务器,且已绑定弹性IP。
- 跳转主机与CSS集群在同一虚拟私有云下。
- 已开放跳转主机的本地端口,用于SSH转发,能够从本地端口转发至CSS集群某一节点的9200端口。
- 关于跳转主机的本地端口转发配置,请参见SSH官方文档。
- 使用PuTTY,通过弹性IP登录已创建的跳转主机。
- 执行如下命令进行端口映射,将发往跳转主机对外开放端口的请求转发到待导入数据的集群中。
ssh -g -L <跳转主机的本地端口:节点的内网访问地址和端口号> -N -f root@<跳转主机的私网IP地址>
- <跳转主机的本地端口>:为步骤1中的端口。
- <节点的内网访问地址和端口号>:为集群中某一节点的内网访问地址和端口号。当该节点出现故障时,将导致命令执行失败。如果集群包含多个节点,可以将<节点的内网访问地址和端口号>替换为集群中另一节点的内网访问地址和端口号;如果集群只包含一个节点,则需要将该节点修复之后再次执行命令进行端口映射。
- <跳转主机的私网IP地址>:打开弹性云服务器管理控制台,从“IP地址”列中获取标有“私网”对应的IP地址。
例如:跳转主机对外开放的端口号为9200,节点的内网访问地址和端口号为192.168.0.81:9200,跳转主机的私网IP地址为192.168.0.227,需要执行如下命令进行端口映射。
ssh -g -L 9200:192.168.0.81:9200 -N -f root@192.168.0.227
- 登录部署了Logstash的服务器,将需要进行操作的数据文件存储至此服务器中。
例如,需要导入的数据文件“access_20181029_log”,文件存储路径为“/tmp/access_log/”,此数据文件中包含的数据如下所示:
文件存储路径中的access_log文件夹如果不存在,用户可以自建。
| All | Heap used for segments | | 18.6403 | MB | | All | Heap used for doc values | | 0.119289 | MB | | All | Heap used for terms | | 17.4095 | MB | | All | Heap used for norms | | 0.0767822 | MB | | All | Heap used for points | | 0.225246 | MB | | All | Heap used for stored fields | | 0.809448 | MB | | All | Segment count | | 101 | | | All | Min Throughput | index-append | 66232.6 | docs/s | | All | Median Throughput | index-append | 66735.3 | docs/s | | All | Max Throughput | index-append | 67745.6 | docs/s | | All | 50th percentile latency | index-append | 510.261 | ms |
- 在部署Logstash的服务器中,执行如下命令在Logstash的安装目录下新建配置文件logstash-simple.conf。
cd /<Logstash的安装目录>/ vi logstash-simple.conf
- 在配置文件logstash-simple.conf中输入如下内容。
input { 数据所在的位置 } filter { 数据的相关处理 } output { elasticsearch { hosts => "<跳转主机的公网IP地址>:<跳转主机对外开放的端口号>" } }
- input:指明了数据的来源。实际请根据用户的具体情况来设置。input参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/input-plugins.html。
- filter:指定对数据进行处理的方式。例如,对日志进行了提取和处理,将非结构化信息转换为结构化信息。filter参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/filter-plugins.html。
- output:指明了数据的目的地址。output参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/output-plugins.html。<跳转主机的公网IP地址>请从弹性云服务器管理控制台的“IP地址”列中获取标有“弹性公网”对应的IP地址。<跳转主机对外开放的端口号>即为步骤1中的端口,例如:9200。
以步骤4中“/tmp/access_log/”的数据文件为例,输入数据文件从首行开始,且过滤条件保持为空,即不做任何数据处理操作。跳转主机的公网IP和端口号为“192.168.0.227:9200”。导入数据的索引名称为“myindex”。配置文件的示例如下所示,配置文件按实际数据情况修改完成后,输入“:wq”保存。
input { file{ path => "/tmp/access_log/*" start_position => "beginning" } } filter { } output { elasticsearch { hosts => "192.168.0.227:9200" index => "myindex" } }
如果在使用中出现license相关的报错,可以尝试设置ilm_enabled => false。
如果集群开启了安全模式,则需要先下载证书。
- 在集群基本信息页面下载证书。
图2 下载证书
- 将下载的证书存放到部署logstash服务器中。
- 修改配置文件logstash-simple.conf。
以步骤4中“/tmp/access_log/”的数据文件为例,输入数据文件从首行开始,且过滤条件保持为空,即不做任何数据处理操作。跳转主机的公网IP和端口号为“192.168.0.227:9200”。导入数据的索引名称为“myindex”,证书存放路径为“/logstash/logstash6.8/config/CloudSearchService.cer”。配置文件的示例如下所示,配置文件按实际数据情况修改完成后,输入“:wq”保存。
input{ file { path => "/tmp/access_log/*" start_position => "beginning" } } filter { } output{ elasticsearch{ hosts => ["https://192.168.0.227:9200"] index => "myindex" user => "admin" password => "******" cacert => "/logstash/logstash6.8/config/CloudSearchService.cer" } }
password:登录安全集群的密码。
- 执行如下命令将Logstash收集的数据导入到集群中。
./bin/logstash -f logstash-simple.conf
此命令需要在存放logstash-simple.conf文件的目录下执行。例如,logstash-simple.conf文件存放在/root/logstash-7.1.1/,则需要先进入该路径,再执行此命令。
- 登录云搜索服务管理控制台。
- 在左侧导航栏中,选择“集群管理 > Elasticsearch”,进入集群管理列表页面。
- 在集群列表页面中,单击待导入数据的集群“操作”列的“Kibana”。
- 在Kibana的左侧导航中选择“Dev Tools”,进入Console界面。
- 在已打开的Kibana的Console界面,通过搜索获取已导入的数据。
在Kibana控制台,输入如下命令,搜索数据。查看搜索结果,如果数据与导入数据一致,表示数据文件的数据已导入成功。
GET myindex/_search
Logstash部署在弹性云服务器上时导入数据
当Logstash部署在同一VPC的弹性云服务时,导入数据的流程说明如图3所示。
- 确保已部署Logstash的弹性云服务器与待导入数据的集群在同一虚拟私有云下,已开放安全组的9200端口的外网访问权限,且弹性云服务器已绑定弹性IP。
- 使用PuTTY登录弹性云服务器。
例如此服务器中存储了需要导入的数据文件“access_20181029_log”,文件存储路径为“/tmp/access_log/”,此数据文件中包含的数据如下所示:
| All | Heap used for segments | | 18.6403 | MB | | All | Heap used for doc values | | 0.119289 | MB | | All | Heap used for terms | | 17.4095 | MB | | All | Heap used for norms | | 0.0767822 | MB | | All | Heap used for points | | 0.225246 | MB | | All | Heap used for stored fields | | 0.809448 | MB | | All | Segment count | | 101 | | | All | Min Throughput | index-append | 66232.6 | docs/s | | All | Median Throughput | index-append | 66735.3 | docs/s | | All | Max Throughput | index-append | 67745.6 | docs/s | | All | 50th percentile latency | index-append | 510.261 | ms |
- 执行如下命令在Logstash的安装目录下新建配置文件logstash-simple.conf。
cd /<Logstash的安装目录>/ vi logstash-simple.conf
在配置文件logstash-simple.conf中输入如下内容。input { 数据所在的位置 } filter { 数据的相关处理 } output { elasticsearch{ hosts => "<节点的内网访问地址和端口号>"} }
- input:指明了数据的来源。实际请根据用户的具体情况来设置。input参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/input-plugins.html。
- filter:对日志进行了提取和处理,将非结构化信息转换为结构化信息。filter参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/filter-plugins.html。
- output:指明了数据的目的地址。output参数的详细解释和使用介绍请参见https://www.elastic.co/guide/en/logstash/current/output-plugins.html。<节点的内网访问地址和端口号>为集群中节点的内网访问地址和端口号。
当集群包含多个节点时,为了避免节点故障,建议将上述命令中<节点的内网访问地址和端口号>替换为该集群中多个节点的内网访问地址和端口号,多个节点的内网访问地址和端口号之间用英文逗号隔开,填写格式请参见如下示例。
hosts => ["192.168.0.81:9200","192.168.0.24:9200"]
当集群只包含一个节点时,填写格式请参见如下示例。
hosts => "192.168.0.81:9200"
以步骤2中“/tmp/access_log/”的数据文件为例,输入数据文件从首行开始,且过滤条件保持为空,即不做任何数据处理操作。需导入数据的集群,其节点内网访问地址和端口号为“192.168.0.81:9200”。导入数据的索引名称为“myindex”。配置文件的示例如下所示,配置文件按实际数据情况修改完成后,输入“:wq”保存。
input { file{ path => "/tmp/access_log/*" start_position => "beginning" } } filter { } output { elasticsearch { hosts => "192.168.0.81:9200" index => "myindex" } }
如果集群开启了安全模式,则需要先下载证书。- 在集群基本信息页面下载证书。
图4 下载证书
- 将下载的证书存放到部署logstash服务器中。
- 修改配置文件logstash-simple.conf。
以步骤2中“/tmp/access_log/”的数据文件为例,输入数据文件从首行开始,且过滤条件保持为空,即不做任何数据处理操作。跳转主机的公网IP和端口号为“192.168.0.227:9200”。导入数据的索引名称为“myindex”,证书存放路径为“/logstash/logstash6.8/config/CloudSearchService.cer”。配置文件的示例如下所示,配置文件按实际数据情况修改完成后,输入“:wq”保存。
input{ file { path => "/tmp/access_log/*" start_position => "beginning" } } filter { } output{ elasticsearch{ hosts => ["https://192.168.0.227:9200"] index => "myindex" user => "admin" password => "******" cacert => "/logstash/logstash6.8/config/CloudSearchService.cer" } }
password:登录安全集群的密码。
- 执行如下命令将Logstash收集的弹性云服务器的数据导入到集群中。
./bin/logstash -f logstash-simple.conf
- 登录云搜索服务管理控制台。
- 在左侧导航栏中,选择“集群管理 > Elasticsearch”,进入集群管理列表页面。
- 在集群列表页面中,单击待导入数据的集群“操作”列的“Kibana”。
- 在Kibana的左侧导航中选择“Dev Tools”,进入Console界面。
- 在已打开的Kibana的Console界面,通过搜索获取已导入的数据。
在Kibana控制台,输入如下命令,搜索数据。查看搜索结果,如果数据与导入数据一致,表示数据文件的数据已导入成功。
GET myindex/_search