网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
云化数据中心 CloudDC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务
文档首页/ 云搜索服务 CSS/ 最佳实践/ Elasticsearch数据迁移/ 通过华为云Logstash实现Elasticsearch集群间数据迁移

通过华为云Logstash实现Elasticsearch集群间数据迁移

更新时间:2025-01-06 GMT+08:00
分享

使用华为云CSS服务的Logstash集群可以实现Elasticsearch集群间的数据迁移。

应用场景

华为云Logstash是一款全托管的数据接入处理服务,兼容开源Logstash的能力,支持用于Elasticsearch集群间数据迁移。

通过华为云Logstash可以实现华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch迁移至华为云Elasticsearch,该方案常用于以下场景:

  • 跨版本迁移:利用Logstash的兼容性和灵活性,实现不同版本间的数据迁移,确保数据在新版本中的可用性和一致性。适用于Elasticsearch集群版本跨度较大的迁移场景,例如从6.X版本迁移至7.X版本。
  • 集群合并:使用Logstash进行数据迁移,将多个Elasticsearch集群的数据整合到一个Elasticsearch集群中,实现多个Elasticsearch数据的统一管理和分析。
  • 服务迁移上云:将自建的Elasticsearch服务迁移到云平台,以利用云服务的可扩展性、维护简便性和成本效益。
  • 变更服务提供商:如果企业当前使用的是第三方Elasticsearch服务,但出于成本、性能或其他战略考虑,希望更换服务提供商至华为云。

方案架构

图1 迁移流程

通过华为云Logstash实现Elasticsearch集群间数据迁移的迁移流程如图1所示。

  1. 输入(Input):华为云Logstash接收来自华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch的数据。
    说明:

    华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch数据迁移到华为云Elasticsearch的操作步骤相同,只是获取源集群的访问地址有差异,具体请参见获取Elasticsearch集群信息

  2. 过滤(Filter):华为云Logstash对数据进行清洗和转换。
  3. 输出(Output):华为云Logstash将数据输出到目标设备,如华为云Elasticsearch。
根据业务需求,可以选择全量数据迁移或增量数据迁移。
  • 全量数据迁移:使用Logstash进行全量数据迁移,适用于迁移初期或需要确保数据完整性的场景。
  • 增量数据迁移:通过Logstash配置增量查询,可以只迁移有增量字段的索引数据。此方法适用于需要持续同步数据或对数据实时性有较高要求的场景。

方案优势

  • 高版本兼容性:适用于不同版本的Elasticsearch集群迁移。
  • 高效的数据处理能力:Logstash支持批量读写操作,可以大幅度提高数据迁移的效率。
  • 并发同步技术:利用slice并发同步技术,可以提高数据迁移的速度和性能,尤其是在处理大规模数据时。
  • 配置简单:华为云Logstash的配置相对简单直观,通过配置文件即可实现数据的输入、处理和输出。
  • 强大的数据处理功能:Logstash内置了丰富的过滤器,可以在迁移过程中对数据进行清洗、转换和丰富。
  • 灵活的迁移策略:根据业务需求,可以灵活选择全量迁移或增量迁移,优化存储使用和迁移时间。

性能影响

使用Logstash迁移集群依托于Scroll API,此API能够高效读取源集群的索引数据,并批量同步至目标集群。这一过程可能会对源集群性能产生影响,具体影响程度取决于目标集群对源集群的读取速度,而读取速度取决于Scroll API的size和slice参数配置。参数配置的详细指导可参考Reindex API文档。
  • 对于资源消耗较高的集群,建议通过调整size参数来减缓迁移速率,或者选择在业务流量低谷时段进行迁移操作,以减轻对集群资源的影响。
  • 对于资源消耗较低的集群,在迁移时可以采用默认参数配置,建议同时监控源集群的性能负载,并根据实际情况适时调整size和slice参数,以优化迁移效率和资源使用。

约束限制

集群迁移过程中,源集群的索引数据不能增删改,否则会导致迁移后的源集群数据和目标集群数据内容不一致。

前提条件

  • 源Elasticsearch集群和目标Elasticsearch集群处于可用状态。
  • 集群间需要保证网络连通。
    • 如果源集群、Logstash和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介
    • 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
    • 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
  • 确认集群的索引已开启“_source”

    集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。

操作步骤

  1. 获取Elasticsearch集群信息
  2. (可选)迁移索引结构:通过脚本迁移Elasticsearch集群的索引模板和索引结构。
  3. 创建Logstash集群:创建一个Logstash集群用于迁移数据。
  4. 验证集群间的网络连通性:验证Logstash和源Elasticsearch集群的连通性。
  5. 使用Logstash迁移集群
  6. 释放Logstash集群:当集群迁移完成后,请及时释放Logstash集群。

获取Elasticsearch集群信息

在迁移集群前,需要先获取必备的集群信息,用于配置迁移任务。

表1 需要获取的Elasticsearch集群信息

集群来源

要获取的信息

获取方式

源集群

华为云Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)
  • 获取集群名称和访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

自建Elasticsearch集群

  • 源集群的名称
  • 源集群的公网访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

第三方Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

目标集群

华为云Elasticsearch集群

  • 目标集群的访问地址
  • 访问目标集群的用户名和密码(仅安全集群涉及)
  • 获取访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

源集群的来源不同,获取信息的方式不同,此处仅介绍如何获取华为云Elasticsearch集群的信息。

  1. 登录云搜索服务管理控制台。
  2. 在左侧菜单栏选择集群管理 > Elasticsearch
  3. 在Elasticsearch集群列表,获取集群名称和访问地址。
    图2 获取集群信息

(可选)迁移索引结构

如果您直接在目标Elasticsearch集群手动创建索引结构,则跳过该步骤。此处提供了一种通过脚本迁移Elasticsearch集群的索引模板和索引结构的方法。

  1. 创建弹性云服务器ECS,用于迁移源集群的元数据。
    1. 创建弹性云服务器ECS,ECS的操作系统选择CentOS,规格选择2U4G,且和CSS服务的集群在同一个虚拟私有云和安全组中。
    2. 测试ECS和源集群、目标集群的连通性。

      在ECS执行命令curl http:// {ip}:{port}测试连通性,当返回200时,则表示已经连通。

      IP是源集群和目标集群访问地址;port是端口号,默认是9200,请以集群实际端口号为准。

      curl http://10.234.73.128:9200 # 输入实际的IP地址,此处仅以非安全集群举例。
      {
        "name" : "es_cluster_migrate-ess-esn-1-1",
        "cluster_name" : "es_cluster_migrate",
        "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
        "version" : {
          "number" : "6.5.4",
          "build_flavor" : "default",
          "build_type" : "tar",
          "build_hash" : "d2ef93d",
          "build_date" : "2018-12-17T21:17:40.758843Z",
          "build_snapshot" : false,
          "lucene_version" : "7.5.0",
          "minimum_wire_compatibility_version" : "5.6.0",
          "minimum_index_compatibility_version" : "5.0.0"
        },
        "Tagline" : "You Know, for Search"
      }
  2. 准备工具和软件,判断ECS是否可以联网,选择不同的安装方式。
    • 是,选择在线安装工具和软件,直接使用yum和pip安装,具体请参见3
    • 否,选择离线安装工具和软件,下载安装包到虚拟机上执行安装命令,具体请参见4
    表2 准备工具和软件

    类型

    目的

    获取位置

    Python2

    主要用户执行迁移脚本。

    下载地址,版本选择python 2.7.18。

    winscp

    Linux上传脚本,跨平台文件传输工具。

    下载Winscp

  3. 在线安装工具和软件。
    1. 执行yum install python2安装python2。
      [root@ecs opt]# yum install python2
    2. 执行yum install python-pip安装pip。
      [root@ecs opt]# yum install python-pip
    3. 执行pip install pyyaml安装yaml依赖。
    4. 执行pip install requests安装requests依赖。
  4. 离线安装工具和软件。
    1. 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
      图3 下载python2安装包
    2. 使用WinSCP工具上传Python安装包到opt目录下,安装python。
      # 解压python压缩包
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      …
      # 解压完成进入目录
      [root@ecs-52bc opt]# cd Python-2.7.18
      # 检查文件配置安装路径
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      …
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # 编译python
      [root@ecs-52bc Python-2.7.18]# make
      # 安装python
      [root@ecs-52bc Python-2.7.18]# make install
    3. 安装完成检查,检查python安装结果。
      # 检查python版本
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # 检查pip版本
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  5. 准备Elasticsearch集群的索引迁移脚本。
    1. 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息
      es_cluster_new:
        # 源集群的名称
        clustername: es_cluster_new
        # 源Elasticsearch集群的访问地址,加上“http://”。
        src_ip: http://x.x.x.x:9200
        # 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        src_username: ""
        src_password: ""
        # 目标Elasticsearch集群的访问地址,加上“http://”。
        dest_ip: http://x.x.x.x:9200
        # 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        dest_username: ""
        dest_password: ""
        # “only_mapping”可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;当设置成false时,则迁移源集群中除“.kibana”“.*”之外的所有索引数据。
        # 迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。
        only_mapping: false
        # 设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。
        mapping:
            test_index_1: test_index_1
        # “only_compare_index”可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。
        only_compare_index: false
    2. 执行vi migrateTemplate.py命令,直接复制输入以下内容无需修改,执行wq保存为索引模板迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_template_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. 执行vi migrateMapping.py命令,直接复制输入以下内容无需修改,执行wq保存为索引结构迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. 执行vi checkIndices.py命令,直接复制输入以下内容无需修改,执行wq保存为索引数据对比脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
  6. 执行如下命令,迁移Elasticsearch集群的索引模板和索引结构。
    python migrateTemplate.py
    python migrateMapping.py

创建Logstash集群

当ECS中的迁移环境准备完成后,在CSS服务创建一个Logstash集群用于迁移数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 单击右上角的创建集群,进入创建集群页面。
  4. 在创建集群页面,根据指导完成集群配置。

    关键配置参数请参见表3,其他参数可以保持默认值。创建集群的详细说明请参见创建Logstash集群

    表3 Logstash集群的关键配置

    参数

    说明

    计费模式

    选择“按需计费”,按实际使用时长计费,计费周期为一小时,不足一小时按一小时计费。

    集群类型

    选择“Logstash”

    集群版本

    选择“7.10.0”

    集群名称

    自定义集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。

    此处以“Logstash-ES”为例。

    虚拟私有云

    VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

    此处选择和目标Elasticsearch集群同一个虚拟私有云(VPC)。

    子网

    通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。

    选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。

    安全组

    安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。

    此处选择和目标Elasticsearch集群同一个安全组。

  5. 单击“下一步:高级配置”,此配置保持默认配置即可。
  6. 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
  7. 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为可用

验证集群间的网络连通性

在启动迁移任务前,需要先验证Logstash和源Elasticsearch集群的网络连通性。

  1. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  2. 在配置中心页面,单击“连通性测试”
  3. 在弹窗中输入源集群的IP地址和端口号,单击“测试”
    图4 连通性测试

    当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由

使用Logstash全量迁移集群数据

在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据,该方法会一次迁移整个Elasticsearch集群的数据。

  1. 登录云搜索服务管理控制台。
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  4. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的全量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-all”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。集群信息的获取方式请参见获取Elasticsearch集群信息
      input{
           elasticsearch{
              # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
              # 访问源集群的用户名和密码,非安全集群无需配置。
              # user => "css_logstash"
              # password => "*****"
              # 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
              # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
              # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
              # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
              # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
              #ssl => true
           }
       }
      
       
      # 移除一些logstash增加的字段。
       filter {
         mutate {
           remove_field => ["@version"]
         }
       }
      
       output{
           elasticsearch{
             # 目标Elasticsearch集群的访问地址
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
             # 访问目标集群的用户名和密码,非安全集群无需配置。
             # user => "css_logstash"
             # password => "*****"
             # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
             # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
             # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
             #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
             # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
             #ssl => true
             # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
             #ssl_certificate_verification => false
           }
       }
      表4 全量迁移配置项说明

      配置项名称

      说明

      input

      hosts

      源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,例如“index*”

      docinfo

      是否重新索引文档,必须为true。

      slices

      配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。

      size

      每次查询返回的最大命中数。

      output

      hosts

      目标集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      迁移到目标集群的索引名称,支持扩展修改,如“Logstash-%{+yyyy.MM.dd}”

      document_type

      使目标端文档类型与源端保持一致。

      document_id

      索引中的文档ID,建议与源端保持一致,如需要自动生成,使用“#”注释掉即可。

    4. 编辑完成后,单击“下一页”配置Logstash配置文件运行参数。

      此示例保持默认值即可,如需设置请参见创建Logstash配置文件

    5. 配置完成后,单击“创建”。

      在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

  5. 启动全量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-all”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash全量迁移任务。

      可以在管道列表看到启动的配置文件。

  6. 数据迁移完毕检查数据一致性。
    • 方式一:使用Putty登录迁移虚拟机,执行命令python checkIndices.py对比数据结果。
    • 方式二:分别在源集群和目标集群的Kibana执行命令GET _cat/indices,对比两者的索引信息是否一致。

使用Logstash增量迁移集群数据

在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据,该方法通过Logstash配置增量查询,仅支持迁移有增量字段的索引数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  4. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的增量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-inc”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。

      不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。集群信息的获取方式请参见获取Elasticsearch集群信息

      input{
           elasticsearch{
               # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
               hosts =>  ["xx.xx.xx.xx:9200"]
               # 访问源集群的用户名和密码,非安全集群无需配置。
               user => "css_logstash"
               password => "******"
               # 配置增量迁移索引。
               index => "*_202102"
               # 配置增量迁移查询语句。
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
               # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
               # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
               # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
               # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
               #ssl => true
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
               # 目标集群的访问地址。
               hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
               # 访问目标集群的用户名和密码,非安全集群无需配置。
               #user => "admin"
               #password => "******"
               # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
               # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
               # 配置集群HTTPS访问证书,CSS集群保持默认。      
               #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0 
               # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
               #ssl => true
               # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
               #ssl_certificate_verification => false
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }
      表5 增量迁移配置项说明

      配置

      说明

      hosts

      分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。

      query

      增量数据的识别标识,一般是Elasticsearch的DLS语句,需要提前分析。其中postsDate为业务中的时间字段。

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。

      scroll

      当源端数据量过大,为了防止Logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。

  5. 启动增量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-inc”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash增量迁移任务。

      可以在管道列表看到启动的配置文件。

  6. 数据迁移完毕检查数据一致性。
    • 方式一:使用Putty登录迁移虚拟机,执行命令python checkIndices.py对比数据结果。
    • 方式二:分别在源集群和目标集群的Kibana执行命令GET _cat/indices,对比两者的索引信息是否一致。

释放Logstash集群

当集群迁移完成后,请及时释放Logstash集群,可以节约资源,避免产生不必要的费用。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“更多 > 删除”,在弹出的确认提示框中,输入“DELETE”,单击“确定”完成集群删除。
提示

您即将访问非华为云网站,请注意账号财产安全

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容