将自建ELK日志导入云日志服务
背景信息
ELK是Elasticsearch、Logstash和Kibana的简称,它们组合起来提供了业界最常用的日志分析和可视化工具。
- Elasticsearch:
- Elasticsearch是一个基于Lucene的开源、分布式、RESTful搜索和分析引擎。它使用JSON作为文档索引的序列化格式,提供全文搜索、结构化搜索、分析以及将这三个功能混合使用的搜索能力。
- Elasticsearch通常用于构建实时搜索和分析应用程序,如网站搜索、日志和监控数据分析等。
- Logstash:
- Logstash是一个开源的、服务器端的数据处理管道,能够同时从多个来源实时接收、转换并将数据发送到用户选择的“存储库”。
- Logstash能够动态地转换数据,并将其发送到多个目标,如Elasticsearch、Kafka等。
- Logstash通常用于日志的收集、过滤和转发。
- Kibana:
- Kibana是一个开源的分析和可视化平台,通常与Elasticsearch一起使用。
- 用户可以使用Kibana来搜索、查看和与存储在Elasticsearch索引中的数据进行交互。即可轻松地进行高级数据分析和可视化数据。
- Kibana通常用于数据的可视化、仪表盘创建和搜索查询。
华为云日志服务LTS在功能丰富度、成本、性能方面优于开源ELK方案,具体对比可以参考云日志服务LTS对比自建ELK,应该如何选择。本文提供最佳实践,使用自定义Python脚本和LTS采集器ICAgent,协助用户将日志从Elasticsearch(简称ES)迁移到LTS中。
方案介绍
当前华为云支持ECS机器通过安装ICAgent来采集日志文件,因此可以基于该功能实现ES日志导入云日志服务。
ES数据先通过python脚本将数据落盘到ECS,然后通过LTS服务的日志接入功能,将落盘的日志文件采集到LTS服务。
整体流程参考如下:
操作步骤
- 登录云日志服务控制台,请参考安装ICAgent在ECS机器安装ICAgent。
- 参考ECS接入配置ECS日志接入云日志服务。
- 脚本执行前期准备。以下示例仅供参考,请以实际信息填写为准。
- 首次使用python,需要安装python环境。
- 首次使用ES时需要安装对应ES版本的python数据包,本次方案测试使用的elasticsearch为7.10.1版本。
pip install elasticsearch==7.10.1
- 方案测试使用的ES为华为云css服务创建的ES。
- 执行用来构造索引数据的python脚本,如果索引已经有数据,忽略这一步,直接执行步骤5。
python脚本需执行在ecs机器,脚本命名为xxx.py格式,构造数据请参考如下示例:
以下斜体字段需按照实际情况进行修改,参考示例是插入1000条数据,内容为:This is a test log,Hello world!!!\n;
- index:要创建的索引名称,参考示例为: test;
- 链接ES:ES的访问url,参考示例为:http://127.0.0.1:9200;
from elasticsearch import Elasticsearch def creadIndex(index): mappings = { "properties": { "content": { "type": "text" } } } es.indices.create(index=index, mappings=mappings) def reportLog(index): i = 0 while i < 1000: i = i + 1 body = {"content": "This is a test log,Hello world!!!\n"} es.index(index=index,body=body) if __name__ == '__main__': #索引名称 index = 'test' #链接ES es = Elasticsearch("http://127.0.0.1:9200") creadIndex(index) reportLog(index)
- 构建python读写脚本,用来将ES数据写入磁盘,输出文件路径需与配置日志接入规则一致。
脚本需执行在ecs机器,命名xxx.py格式,写入磁盘数据脚本请参考如下示例:
以下斜体字段需按照实际情况进行修改。
- index:字段为索引名,参考示例为: test;
- pathFile :为数据写入磁盘绝对路径,参考示例为: /tmp/test.log;
- scroll_size :为索引滚动查询大小,参考示例为:100;
- 链接ES:ES的访问url,参考示例为:http://127.0.0.1:9200;
from elasticsearch import Elasticsearch def writeLog(res, pathFile): data = res.get('hits').get('hits') i = 0 while i < len(data): log = data[i].get('_source').get('content') file = open(pathFile, 'a', encoding='UTF-8') file.writelines(log) i = i + 1 file.flush() file.close() if __name__ == '__main__': #索引名称 index = 'test' #输出文件路径 pathFile = '/tmp/' + index + '.log' #滚动查询一次滚动大小,默认为100 scroll_size = 100 #链接ES es = Elasticsearch("http://127.0.0.1:9200") init = True while 1: if (init == True): res = es.search(index=index, scroll="1m", body={"size": scroll_size}) init =False else: scroll_id = res.get("_scroll_id") res = es.scroll(scroll="1m", scroll_id=scroll_id) if not res.get('hits').get('hits'): break writeLog(res, pathFile)
- 执行该命令前确保python已安装成功,在ecs执行如下命令,将ES索引数据写入磁盘。
python xxx.py
- 查看数据是否成功查询及写入磁盘。
参考示例demo写入磁盘路径为:/tmp/test.log,操作时需要填写实际使用的路径,执行如下命令可以查看数据写入磁盘情况。
tail -f /tmp/test.log
- 登录云日志服务控制台,在“日志管理”页面,单击日志流名称进入详情页面,在“原始日志”页签查看到日志数据,即代表采集成功。