使用Elasticsearch Pipeline实现数据增量迁移
通过Elasticsearch Pipeline的自动字段更新能力,解决了增量迁移中增量字段识别困难的问题,实现了可靠的增量数据迁移方案。
应用场景
- 索引未配置增量时间字段,无法识别增量数据。
- 业务逻辑复杂,无法可靠识别增量字段。
- 不同索引的增量字段不一致,需统一增量方式。
- 数据更新未同步更新增量字段。
本方案通过Elasticsearch Pipeline功能,在数据写入时自动为文档添加增量时间字段,实现灵活的数据增量迁移。
方案架构
通过Elasticsearch Pipeline功能,在数据写入时自动添加增量时间字段,结合迁移工具实现增量迁移流程:
- 在源Elasticsearch集群中配置Pipeline,自动添加增量时间字段。
- 配置索引关联Pipeline,实现数据写入时自动添加增量时间字段。
- 通过迁移工具实现增量数据迁移。
方案优势
- 简化操作:无需分析索引的增量字段,通过Pipeline统一生成增量字段,降低配置复杂度。
- 灵活性:适用于不同索引和业务场景,无需修改索引结构即可实现增量迁移。
- 兼容性:可与Logstash、ESM、Reindex等多种迁移工具结合使用。
约束限制
集群迁移过程中,源集群的索引数据不能删除,否则可能导致增量数据丢失,造成迁移后的源集群数据和目标集群数据内容不一致。
前提条件
- 源Elasticsearch集群和目标Elasticsearch集群处于可用状态,且集群版本大于6.x(否则不支持Elasticsearch Pipeline功能)。
- 集群间需要保证网络连通。
- 如果源集群和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介。
- 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
- 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
- 确认集群的索引已开启“_source”。
集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。
操作步骤
- 登录源Elasticsearch集群的Kibana。
源集群的来源不同,登录方式不同,如果源端没有安装Kibana,也可以直接使用Curl命令进行集群配置。本文以源端是CSS服务的Elasticsearch集群为例介绍操作步骤。
- 登录云搜索服务管理控制台。
- 在左侧导航栏,选择“集群管理 > Elasticsearch”。
- 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
- 在Kibana左侧导航栏选择“Dev Tools”,进入操作页面。
- 配置增量时间字段,在源端集群中为索引添加增量时间字段。
PUT /{index_name}/_mapping { "properties": { "@migrate_update_time": { "type": "date" } } }
为索引增加增量时间字段(如@migrate_update_time),设置字段类型为date。其中 {index_name} 是索引名称。
- 创建Pipeline,自动添加增量时间字段。
PUT _ingest/pipeline/migrate_update_time { "description": "Adds update_time timestamp to documents", "processors": [ { "set": { "field": "_source.@migrate_update_time", "value": "{{_ingest.timestamp}}" } } ] }
processors配置会读取当前机器的时间写入到索引的增量时间字段(如@migrate_update_time)中。
- 配置索引关联Pipeline,在数据写入时自动添加增量时间字段。
PUT {index_name}/_settings { "index.default_pipeline": "migrate_update_time" }
配置索引的默认Pipeline,在索引数据增加和更新时,都会经过Pipeline更新增量时间字段@migrate_update_time。
- 查询增量数据,通过查询增量时间字段@migrate_update_time获取增量数据。
GET /{index_name}/_search { "query": { "range": { "@migrate_update_time": { "gte": "2025-01-01T00:00:00" } } } }
查询增量时间字段@migrate_update_time大于等于指定时间的文档,时间格式必须与字段映射类型匹配(date类型)。
- 使用迁移工具进行增量迁移。
以ESM为例,执行增量迁移命令:
./migrator-linux-amd6 -s http://source:9200 -d http://dest:9200 -x {index_name} -m admin:password -n admin:password -w 5 -b 10 -q "@migrate_update_time:[\"2025-04-08T00:00:00\" TO \"2030-01-01T 00:00:00\"]"
迁移命令说明及更多其他迁移工具的使用请参见Elasticsearch集群数据迁移方案介绍。
- 检查数据一致性。
数据迁移完成后,分别在源集群和目标集群的Kibana执行命令GET {index_name}/_count,对比两者的索引信息是否一致。
常见问题:迁移过程中出现Pipeline不存在错误
- 现象:迁移过程中出现如图1所示的报错。
- 原因:源Elasticsearch集群创建了Pipeline,迁移索引结构的时候会把索引的Pipeline也一并迁移到目标Elasticsearch。但是目标端集群并没有创建Pipeline,导致出现该报错。
- 解决方案:需要在目标Elasticsearch集群取消索引的Pipeline。
登录目标Elasticsearch集群的Kibana,进入“Dev Tools”执行如下命令:
PUT {index_name}/_settings { "index.default_pipeline": null }