文档首页/ 云搜索服务 CSS/ 最佳实践/ Elasticsearch数据迁移/ 使用Elasticsearch Pipeline实现数据增量迁移
更新时间:2025-08-29 GMT+08:00
分享

使用Elasticsearch Pipeline实现数据增量迁移

通过Elasticsearch Pipeline的自动字段更新能力,解决了增量迁移中增量字段识别困难的问题,实现了可靠的增量数据迁移方案。

应用场景

当使用Logstash、ESM等工具进行Elasticsearch集群数据的增量迁移时,如果遇到以下问题,可采用本方案:
  • 索引未配置增量时间字段,无法识别增量数据。
  • 业务逻辑复杂,无法可靠识别增量字段。
  • 不同索引的增量字段不一致,需统一增量方式。
  • 数据更新未同步更新增量字段。

本方案通过Elasticsearch Pipeline功能,在数据写入时自动为文档添加增量时间字段,实现灵活的数据增量迁移。

方案架构

通过Elasticsearch Pipeline功能,在数据写入时自动添加增量时间字段,结合迁移工具实现增量迁移流程:

  1. 在源Elasticsearch集群中配置Pipeline,自动添加增量时间字段。
  2. 配置索引关联Pipeline,实现数据写入时自动添加增量时间字段。
  3. 通过迁移工具实现增量数据迁移。

方案优势

  • 简化操作:无需分析索引的增量字段,通过Pipeline统一生成增量字段,降低配置复杂度。
  • 灵活性:适用于不同索引和业务场景,无需修改索引结构即可实现增量迁移。
  • 兼容性:可与Logstash、ESM、Reindex等多种迁移工具结合使用。

约束限制

集群迁移过程中,源集群的索引数据不能删除,否则可能导致增量数据丢失,造成迁移后的源集群数据和目标集群数据内容不一致。

前提条件

  • 源Elasticsearch集群和目标Elasticsearch集群处于可用状态,且集群版本大于6.x(否则不支持Elasticsearch Pipeline功能)。
  • 集群间需要保证网络连通。
    • 如果源集群和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介
    • 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
    • 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
  • 确认集群的索引已开启“_source”

    集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。

操作步骤

  1. 登录源Elasticsearch集群的Kibana。

    源集群的来源不同,登录方式不同,如果源端没有安装Kibana,也可以直接使用Curl命令进行集群配置。本文以源端是CSS服务的Elasticsearch集群为例介绍操作步骤。
    1. 登录云搜索服务管理控制台
    2. 在左侧导航栏,选择“集群管理 > Elasticsearch”
    3. 在集群列表,选择目标集群,单击操作列的“Kibana”,登录Kibana。
    4. 在Kibana左侧导航栏选择“Dev Tools”,进入操作页面。

  2. 配置增量时间字段,在源端集群中为索引添加增量时间字段。

    PUT /{index_name}/_mapping
    {
      "properties": {
        "@migrate_update_time": {
          "type": "date"
        }
      }
    }

    为索引增加增量时间字段(如@migrate_update_time),设置字段类型为date。其中 {index_name} 是索引名称。

  3. 创建Pipeline,自动添加增量时间字段。

    PUT _ingest/pipeline/migrate_update_time
    {
      "description": "Adds update_time timestamp to documents",
      "processors": [
        {
          "set": {
            "field": "_source.@migrate_update_time",
            "value": "{{_ingest.timestamp}}"
          }
        }
      ]
    }

    processors配置会读取当前机器的时间写入到索引的增量时间字段(如@migrate_update_time)中。

  4. 配置索引关联Pipeline,在数据写入时自动添加增量时间字段。

    PUT {index_name}/_settings
    {
      "index.default_pipeline": "migrate_update_time"
    }

    配置索引的默认Pipeline,在索引数据增加和更新时,都会经过Pipeline更新增量时间字段@migrate_update_time。

  5. 查询增量数据,通过查询增量时间字段@migrate_update_time获取增量数据。

    GET /{index_name}/_search
    {
      "query": {
        "range": {
          "@migrate_update_time": {
            "gte": "2025-01-01T00:00:00"
          }
        }
      }
    }

    查询增量时间字段@migrate_update_time大于等于指定时间的文档,时间格式必须与字段映射类型匹配(date类型)。

  6. 使用迁移工具进行增量迁移。

    以ESM为例,执行增量迁移命令:

    ./migrator-linux-amd6 -s http://source:9200 -d http://dest:9200 -x {index_name} -m admin:password -n admin:password -w 5 -b 10 -q "@migrate_update_time:[\"2025-04-08T00:00:00\" TO \"2030-01-01T 00:00:00\"]"

    迁移命令说明及更多其他迁移工具的使用请参见Elasticsearch集群数据迁移方案介绍

  7. 检查数据一致性。

    数据迁移完成后,分别在源集群和目标集群的Kibana执行命令GET {index_name}/_count,对比两者的索引信息是否一致。

常见问题:迁移过程中出现Pipeline不存在错误

  • 现象:迁移过程中出现如图1所示的报错。
    图1 Pipeline不存在错误
  • 原因:源Elasticsearch集群创建了Pipeline,迁移索引结构的时候会把索引的Pipeline也一并迁移到目标Elasticsearch。但是目标端集群并没有创建Pipeline,导致出现该报错。
  • 解决方案:需要在目标Elasticsearch集群取消索引的Pipeline。
    登录目标Elasticsearch集群的Kibana,进入“Dev Tools”执行如下命令:
    PUT {index_name}/_settings
    {
        "index.default_pipeline":  null
    }

相关文档