使用Logstash将OBS数据导入Elasticsearch
通过CSS服务的Logstash将OBS数据导入Elasticsearch,可实现高效的数据检索与分析能力,满足数据迁移场景。
应用场景
对象存储服务OBS用于存储海量数据。当需要对这些数据进行快速搜索和分析时,可以使用CSS服务的Logstash将其导入到Elasticsearch集群中。常见场景包括:
- 数据检索与分析: 将存储在OBS中的数据(如日志、业务数据等)定期或实时同步到Elasticsearch,以便快速执行全文检索、聚合分析和可视化。
- OBS桶操作日志分析: 将OBS桶的操作日志(访问日志)同步到Elasticsearch,便于审计追踪、行为分析和异常检测。
方案架构

Logstash支持从多种数据源采集数据。对于存储在OBS中的数据,可以使用Logstash的logstash-input-s3插件来读取OBS上的对象,并通过logstash-output-elasticsearch插件将处理后的数据同步到目标Elasticsearch集群。
- 输入 (Input):通过logstash-input-s3插件配置连接到OBS桶,监控指定的文件并读取内容。
- 处理 (Processing):Logstash配置文件中可选的filter部分用于对读取的数据进行清洗、转换和结构化处理(例如,使用grok过滤器解析日志格式)。
- 输出 (Output):logstash-output-elasticsearch插件配置连接到目标Elasticsearch集群,将数据索引化。
方案优势
- 兼容性强:支持多种数据格式,如JSON、CSV、OBS日志。
- 高扩展性:可结合Filter插件对数据进行清洗、字段提取或格式转换。
- 灵活性高: 支持多种目标存储,如CSS服务的Elasticsearch、自建Elasticsearch或第三方Elasticsearch。
前提条件
- 目标数据已上传至OBS桶中,且OBS桶必须和Elasticsearch集群在同一区域。并获取OBS桶的如下信息:桶名称、访问端点(Endpoint)和区域(Region),获取方式请参见OBS访问规则。
- 已创建CSS Elasticsearch集群,并获取Elasticsearch集群的如下信息:访问地址(Host)、安全集群的用户名和密码(非安全集群不涉及)。
- 已创建CSS Logstash集群,并确认Logstash与Elasticsearch部署在同一VPC下,确保网络连通。
操作步骤
- 访问CSS服务的Logstash集群。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Logstash”。
- 网络连通性测试。当Logstash和Elasticsearch部署在相同VPC下时,可跳过该步骤。
- 在Logstash集群列表,选择目标集群,单击操作列的“配置中心”。
- 在配置中心页面,单击“连通性测试”。
- 在连通性测试弹窗中,输入Elasticsearch的访问地址和端口号,单击“测试”。
当显示“可用”时,表示集群间网络连通。
- 如果Logstash与Elasticsearch都在内网却网络不连通,则可以配置Logstash集群路由连通集群间的网络。
- 如果Elasticsearch在外网且网络不连通,则可以配置Logstash公网访问连通集群间的网络。
- 准备Logstash配置文件。
- 在Logstash集群列表,选择目标集群,单击操作列的“配置中心”。
- 在配置中心页面,单击右上角“创建”,编辑配置文件并保存。
配置文件内容(以下仅为配置示例,需基于业务需求修改):
input { s3 { access_key_id => "YOUR_AK" # 账号的Access Key ID secret_access_key => "YOUR_SK" # 账号的Secret Access Key bucket => "log_obs_access" # OBS桶名称 prefix => "access_log" # 文件前缀(可选) endpoint => "https://OBS_Endpoint" # OBS访问地址 region => "REGION" # OBS所属区域 watch_for_new_files => true # 监控新文件 temporary_directory => "/opt/data/tmp/" # 临时存储目录 backup_to_bucket => "backup_log_obs_access" # 备份桶(可选) backup_add_prefix => "backup-" # 备份文件前缀(可选) } } filter { # (可选)示例:解析OBS桶日志 grok { match => { "message" => '%{OBS_ACCESS_LOG_PATTERN}' } } mutate { remove_field => ["@version", "message"] # 移除冗余字段 } } output { elasticsearch { user => "USERNAME" # Elasticsearch用户名(仅安全集群涉及) password => "YOUR_PASSWORD" # Elasticsearch密码(仅安全集群涉及) hosts => ["192.168.0.xxx:9200"] # Elasticsearch集群访问地址 index => "delivery_events_log_alias" # 索引名称 manage_template => false # 不管理索引模板 ilm_enabled => false # 关闭ILM策略 } }
- 启动Logstash管道任务。
- 在配置中心页面,选择新建的配置文件,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,勾选“是否保持常驻”,开启保持常驻,确保Logstash配置文件持续运行,不会因为服务重启或其他原因导致数据处理中断。
保持常驻会在每个节点上面配置一个守护进程,当Logstash服务出现故障的时候,会主动拉起并修复,从而保证Logstash管道稳定运行。
- 单击“确定”,启动配置文件。
在管道列表中可以查看启动的配置文件状态,监控数据迁移情况。
- 验证数据同步。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Elasticsearch”。
- 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
- 在Kibana左侧导航栏选择“Dev Tools”,进入操作页面。
- 执行索引查询命令,检查目标索引是否返回了预期的数据记录数。
GET delivery_events_log_alias/_count { "query": {"match_all": {}} }
如果返回结果中“count”值不为0,表示数据已成功同步。
常见问题:如何查看OBS桶访问日志
当OBS桶开启了日志记录功能,则可以通过Logstash读取OBS桶的访问日志,将日志数据写入Elasticsearch,查看并分析OBS桶的操作行为。
OBS桶日志格式如下所示:
15e02840b2784ffb9dcac293afc01a75 genean-hot-test [02/Feb/2024:07:51:17 +0000] 58.250.177.72 15e02840b2784ffb9dcac293afc01a75 0000018D68CCEF1AD3296F98BB7B4255 REST.GET.OBJECT 00002c9d-172a-495d-b0a3-4aca8c1f86cc "GET /genean-hot-test/00002c9d-172a-495d-b0a3-4aca8c1f86cc?AWSAccessKeyId=H0N1CQY4D1ZBSHABB3NF&Expires=1706860576&response-content-disposition=attachment&response-content-type=application/octet-stream&x-amz-security-token=*****&Signature=DeC2lkZVFB4CjDjKe147ZOl8sKY%3D HTTP/1.1" 200 - 82509 82509 84 84 "https://console.example.com/" "Mozilla/5.0 (xx.xx.xx; xx; xx) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0" - - STANDARD - "-" efe6d963779e4475bb81c5cb1f5f5368
OBS日志字段的详细定义请参见使用桶日志记录OBS日志信息。
在Logstash配置文件中增加filter配置可以自定义切割OBS桶的日志数据,配置方式参考如下。
filter { grok { match => { "message" => '(?<BucketOwner>[^ ]+) (?<Bucket>[^ ]+) \[%{HTTPDATE:Time}\] (?<RemoteIP>[^ ]+) (?<Requester>[^ ]+) (?<RequestID>[^ ]+) (?<Operation>[^ ]+) (?<Key>[^ ]+) \"(?<RequestURI>[^"]+)\" (?<HTTPStatus>[^ ]+) (?<ErrorCode>[^ ]+) (?<BytesSent>[^ ]+) (?<ObjectSize>[^ ]+) (?<TotalTime>[^ ]+) (?<Turn-AroundTime>[^ ]+) \"(?<Referer>[^"]+)\" \"(?<User-Agent>[^"]+)\" (?<VersionID>[^ ]+) (?<STSLogUrn>[^ ]+) (?<StorageClass>[^ ]+) (?<TargetStorageClass>[^ ]+) \"(?<DentryName>[^ ]+)\" (?<IAMUserID>[^ ]+)' } timeout_millis => 3000 timeout_scope => "event" } mutate { remove_field => ["@version","message"] } }
OBS桶的日志数据输出到Elasticsearch后,即可进行搜索分析。