将自建ELK日志导入云日志服务LTS
方案概述
ELK是Elasticsearch、Logstash和Kibana的简称,它们组合起来提供了业界最常用的日志分析和可视化工具。
- Elasticsearch是一个基于Lucene的开源、分布式、RESTful搜索和分析引擎。
- Logstash是一个开源的、服务器端的数据处理管道,能够同时从多个来源实时接收、转换并将数据发送到用户选择的“存储库”。通常用于日志的收集、过滤和转发。
- Kibana是一个开源的分析和可视化平台,用于数据的可视化、仪表盘创建和搜索查询。通常与Elasticsearch一起使用。
华为云日志服务LTS在功能丰富度、成本、性能方面优于开源ELK方案,具体对比可以参考云日志服务LTS对比自建ELK Stack有什么优势?。本文提供最佳实践,使用自定义Python脚本和LTS采集器ICAgent,协助用户将日志从Elasticsearch(简称ES)迁移到LTS中。
当前华为云支持ECS机器通过安装ICAgent来采集日志文件,因此可以基于该功能实现Elasticsearch日志导入云日志服务。
Elasticsearch数据先通过python脚本将数据落盘到ECS,然后通过LTS服务的日志接入功能,将落盘的日志文件采集到LTS服务。
将自建ELK日志导入云日志服务LTS
- 登录云日志服务控制台。
- 请参考安装ICAgent在ECS主机安装ICAgent。
- 配置ECS日志接入云日志服务,请参考ECS接入。
- 脚本执行前期准备。以下示例仅供参考,请以实际信息填写为准。
- 首次使用python,需要安装python环境。
- 首次使用ES时需要安装对应ES版本的python数据包,本次方案测试使用的elasticsearch为7.10.1版本。
pip install elasticsearch==7.10.1
- 方案测试使用的ES为华为云CSS服务创建的ES。
- 执行用来构造索引数据的python脚本,如果索引已经有数据,忽略这一步,直接执行6。
python脚本需执行在ECS机器,脚本命名为xxx.py格式,构造数据请参考如下示例:
以下斜体字段需按照实际情况进行修改,参考示例是插入1000条数据,内容为:This is a test log,Hello world!!!\n;
- index:要创建的索引名称,参考示例为: test;
- 链接ES:ES的访问url,参考示例为:http://127.0.0.1:9200;
from elasticsearch import Elasticsearch def creadIndex(index): mappings = { "properties": { "content": { "type": "text" } } } es.indices.create(index=index, mappings=mappings) def reportLog(index): i = 0 while i < 1000: i = i + 1 body = {"content": "This is a test log,Hello world!!!\n"} es.index(index=index,body=body) if __name__ == '__main__': #索引名称 index = 'test' #链接ES es = Elasticsearch("http://127.0.0.1:9200") creadIndex(index) reportLog(index)
- 构建python读写脚本,用来将ES数据写入磁盘,输出文件路径需与配置日志接入规则一致。
脚本需执行在ecs机器,命名xxx.py格式,写入磁盘数据脚本请参考如下示例:
以下斜体字段需按照实际情况进行修改。
- index:字段为索引名,参考示例为: test;
- pathFile :为数据写入磁盘绝对路径,参考示例为: /tmp/test.log;
- scroll_size :为索引滚动查询大小,参考示例为:100;
- 链接ES:ES的访问url,参考示例为:http://127.0.0.1:9200;
from elasticsearch import Elasticsearch def writeLog(res, pathFile): data = res.get('hits').get('hits') i = 0 while i < len(data): log = data[i].get('_source').get('content') file = open(pathFile, 'a', encoding='UTF-8') file.writelines(log) i = i + 1 file.flush() file.close() if __name__ == '__main__': #索引名称 index = 'test' #输出文件路径 pathFile = '/tmp/' + index + '.log' #滚动查询一次滚动大小,默认为100 scroll_size = 100 #链接ES es = Elasticsearch("http://127.0.0.1:9200") init = True while 1: if (init == True): res = es.search(index=index, scroll="1m", body={"size": scroll_size}) init =False else: scroll_id = res.get("_scroll_id") res = es.scroll(scroll="1m", scroll_id=scroll_id) if not res.get('hits').get('hits'): break writeLog(res, pathFile)
- 执行命令前,请确保python已安装成功,在ECS执行如下命令,将ES索引数据写入磁盘。
python xxx.py
- 查看数据是否成功查询及写入磁盘。
参考示例demo写入磁盘路径为:/tmp/test.log,操作时需要填写实际使用的路径,执行如下命令可以查看数据写入磁盘情况。
tail -f /tmp/test.log
- 登录云日志服务控制台,在“日志管理”页面,单击日志流名称进入详情页面,在“日志搜索”页签查看到日志数据,即代表采集成功。