计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive
文档首页/ 云搜索服务 CSS/ 最佳实践/ Elasticsearch数据迁移/ 通过华为云Logstash实现Elasticsearch集群间数据迁移

通过华为云Logstash实现Elasticsearch集群间数据迁移

更新时间:2025-01-06 GMT+08:00

使用华为云CSS服务的Logstash集群可以实现Elasticsearch集群间的数据迁移。

应用场景

华为云Logstash是一款全托管的数据接入处理服务,兼容开源Logstash的能力,支持用于Elasticsearch集群间数据迁移。

通过华为云Logstash可以实现华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch迁移至华为云Elasticsearch,该方案常用于以下场景:

  • 跨版本迁移:利用Logstash的兼容性和灵活性,实现不同版本间的数据迁移,确保数据在新版本中的可用性和一致性。适用于Elasticsearch集群版本跨度较大的迁移场景,例如从6.X版本迁移至7.X版本。
  • 集群合并:使用Logstash进行数据迁移,将多个Elasticsearch集群的数据整合到一个Elasticsearch集群中,实现多个Elasticsearch数据的统一管理和分析。
  • 服务迁移上云:将自建的Elasticsearch服务迁移到云平台,以利用云服务的可扩展性、维护简便性和成本效益。
  • 变更服务提供商:如果企业当前使用的是第三方Elasticsearch服务,但出于成本、性能或其他战略考虑,希望更换服务提供商至华为云。

方案架构

图1 迁移流程

通过华为云Logstash实现Elasticsearch集群间数据迁移的迁移流程如图1所示。

  1. 输入(Input):华为云Logstash接收来自华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch的数据。
    说明:

    华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch数据迁移到华为云Elasticsearch的操作步骤相同,只是获取源集群的访问地址有差异,具体请参见获取Elasticsearch集群信息

  2. 过滤(Filter):华为云Logstash对数据进行清洗和转换。
  3. 输出(Output):华为云Logstash将数据输出到目标设备,如华为云Elasticsearch。
根据业务需求,可以选择全量数据迁移或增量数据迁移。
  • 全量数据迁移:使用Logstash进行全量数据迁移,适用于迁移初期或需要确保数据完整性的场景。
  • 增量数据迁移:通过Logstash配置增量查询,可以只迁移有增量字段的索引数据。此方法适用于需要持续同步数据或对数据实时性有较高要求的场景。

方案优势

  • 高版本兼容性:适用于不同版本的Elasticsearch集群迁移。
  • 高效的数据处理能力:Logstash支持批量读写操作,可以大幅度提高数据迁移的效率。
  • 并发同步技术:利用slice并发同步技术,可以提高数据迁移的速度和性能,尤其是在处理大规模数据时。
  • 配置简单:华为云Logstash的配置相对简单直观,通过配置文件即可实现数据的输入、处理和输出。
  • 强大的数据处理功能:Logstash内置了丰富的过滤器,可以在迁移过程中对数据进行清洗、转换和丰富。
  • 灵活的迁移策略:根据业务需求,可以灵活选择全量迁移或增量迁移,优化存储使用和迁移时间。

性能影响

使用Logstash迁移集群依托于Scroll API,此API能够高效读取源集群的索引数据,并批量同步至目标集群。这一过程可能会对源集群性能产生影响,具体影响程度取决于目标集群对源集群的读取速度,而读取速度取决于Scroll API的size和slice参数配置。参数配置的详细指导可参考Reindex API文档。
  • 对于资源消耗较高的集群,建议通过调整size参数来减缓迁移速率,或者选择在业务流量低谷时段进行迁移操作,以减轻对集群资源的影响。
  • 对于资源消耗较低的集群,在迁移时可以采用默认参数配置,建议同时监控源集群的性能负载,并根据实际情况适时调整size和slice参数,以优化迁移效率和资源使用。

约束限制

集群迁移过程中,源集群的索引数据不能增删改,否则会导致迁移后的源集群数据和目标集群数据内容不一致。

前提条件

  • 源Elasticsearch集群和目标Elasticsearch集群处于可用状态。
  • 集群间需要保证网络连通。
    • 如果源集群、Logstash和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介
    • 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
    • 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
  • 确认集群的索引已开启“_source”

    集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。

操作步骤

  1. 获取Elasticsearch集群信息
  2. (可选)迁移索引结构:通过脚本迁移Elasticsearch集群的索引模板和索引结构。
  3. 创建Logstash集群:创建一个Logstash集群用于迁移数据。
  4. 验证集群间的网络连通性:验证Logstash和源Elasticsearch集群的连通性。
  5. 使用Logstash迁移集群
  6. 释放Logstash集群:当集群迁移完成后,请及时释放Logstash集群。

获取Elasticsearch集群信息

在迁移集群前,需要先获取必备的集群信息,用于配置迁移任务。

表1 需要获取的Elasticsearch集群信息

集群来源

要获取的信息

获取方式

源集群

华为云Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)
  • 获取集群名称和访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

自建Elasticsearch集群

  • 源集群的名称
  • 源集群的公网访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

第三方Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

目标集群

华为云Elasticsearch集群

  • 目标集群的访问地址
  • 访问目标集群的用户名和密码(仅安全集群涉及)
  • 获取访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

源集群的来源不同,获取信息的方式不同,此处仅介绍如何获取华为云Elasticsearch集群的信息。

  1. 登录云搜索服务管理控制台。
  2. 在左侧菜单栏选择集群管理 > Elasticsearch
  3. 在Elasticsearch集群列表,获取集群名称和访问地址。
    图2 获取集群信息

(可选)迁移索引结构

如果您直接在目标Elasticsearch集群手动创建索引结构,则跳过该步骤。此处提供了一种通过脚本迁移Elasticsearch集群的索引模板和索引结构的方法。

  1. 创建弹性云服务器ECS,用于迁移源集群的元数据。
    1. 创建弹性云服务器ECS,ECS的操作系统选择CentOS,规格选择2U4G,且和CSS服务的集群在同一个虚拟私有云和安全组中。
    2. 测试ECS和源集群、目标集群的连通性。

      在ECS执行命令curl http:// {ip}:{port}测试连通性,当返回200时,则表示已经连通。

      IP是源集群和目标集群访问地址;port是端口号,默认是9200,请以集群实际端口号为准。

      curl http://10.234.73.128:9200 # 输入实际的IP地址,此处仅以非安全集群举例。
      {
        "name" : "es_cluster_migrate-ess-esn-1-1",
        "cluster_name" : "es_cluster_migrate",
        "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
        "version" : {
          "number" : "6.5.4",
          "build_flavor" : "default",
          "build_type" : "tar",
          "build_hash" : "d2ef93d",
          "build_date" : "2018-12-17T21:17:40.758843Z",
          "build_snapshot" : false,
          "lucene_version" : "7.5.0",
          "minimum_wire_compatibility_version" : "5.6.0",
          "minimum_index_compatibility_version" : "5.0.0"
        },
        "Tagline" : "You Know, for Search"
      }
  2. 准备工具和软件,判断ECS是否可以联网,选择不同的安装方式。
    • 是,选择在线安装工具和软件,直接使用yum和pip安装,具体请参见3
    • 否,选择离线安装工具和软件,下载安装包到虚拟机上执行安装命令,具体请参见4
    表2 准备工具和软件

    类型

    目的

    获取位置

    Python2

    主要用户执行迁移脚本。

    下载地址,版本选择python 2.7.18。

    winscp

    Linux上传脚本,跨平台文件传输工具。

    下载Winscp

  3. 在线安装工具和软件。
    1. 执行yum install python2安装python2。
      [root@ecs opt]# yum install python2
    2. 执行yum install python-pip安装pip。
      [root@ecs opt]# yum install python-pip
    3. 执行pip install pyyaml安装yaml依赖。
    4. 执行pip install requests安装requests依赖。
  4. 离线安装工具和软件。
    1. 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
      图3 下载python2安装包
    2. 使用WinSCP工具上传Python安装包到opt目录下,安装python。
      # 解压python压缩包
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      …
      # 解压完成进入目录
      [root@ecs-52bc opt]# cd Python-2.7.18
      # 检查文件配置安装路径
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      …
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # 编译python
      [root@ecs-52bc Python-2.7.18]# make
      # 安装python
      [root@ecs-52bc Python-2.7.18]# make install
    3. 安装完成检查,检查python安装结果。
      # 检查python版本
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # 检查pip版本
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  5. 准备Elasticsearch集群的索引迁移脚本。
    1. 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息
      es_cluster_new:
        # 源集群的名称
        clustername: es_cluster_new
        # 源Elasticsearch集群的访问地址,加上“http://”。
        src_ip: http://x.x.x.x:9200
        # 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        src_username: ""
        src_password: ""
        # 目标Elasticsearch集群的访问地址,加上“http://”。
        dest_ip: http://x.x.x.x:9200
        # 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        dest_username: ""
        dest_password: ""
        # “only_mapping”可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;当设置成false时,则迁移源集群中除“.kibana”“.*”之外的所有索引数据。
        # 迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。
        only_mapping: false
        # 设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。
        mapping:
            test_index_1: test_index_1
        # “only_compare_index”可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。
        only_compare_index: false
    2. 执行vi migrateTemplate.py命令,直接复制输入以下内容无需修改,执行wq保存为索引模板迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_template_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. 执行vi migrateMapping.py命令,直接复制输入以下内容无需修改,执行wq保存为索引结构迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. 执行vi checkIndices.py命令,直接复制输入以下内容无需修改,执行wq保存为索引数据对比脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
  6. 执行如下命令,迁移Elasticsearch集群的索引模板和索引结构。
    python migrateTemplate.py
    python migrateMapping.py

创建Logstash集群

当ECS中的迁移环境准备完成后,在CSS服务创建一个Logstash集群用于迁移数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 单击右上角的创建集群,进入创建集群页面。
  4. 在创建集群页面,根据指导完成集群配置。

    关键配置参数请参见表3,其他参数可以保持默认值。创建集群的详细说明请参见创建Logstash集群

    表3 Logstash集群的关键配置

    参数

    说明

    计费模式

    选择“按需计费”,按实际使用时长计费,计费周期为一小时,不足一小时按一小时计费。

    集群类型

    选择“Logstash”

    集群版本

    选择“7.10.0”

    集群名称

    自定义集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。

    此处以“Logstash-ES”为例。

    虚拟私有云

    VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

    此处选择和目标Elasticsearch集群同一个虚拟私有云(VPC)。

    子网

    通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。

    选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。

    安全组

    安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。

    此处选择和目标Elasticsearch集群同一个安全组。

  5. 单击“下一步:高级配置”,此配置保持默认配置即可。
  6. 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
  7. 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为可用

验证集群间的网络连通性

在启动迁移任务前,需要先验证Logstash和源Elasticsearch集群的网络连通性。

  1. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  2. 在配置中心页面,单击“连通性测试”
  3. 在弹窗中输入源集群的IP地址和端口号,单击“测试”
    图4 连通性测试

    当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由

使用Logstash全量迁移集群数据

在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据,该方法会一次迁移整个Elasticsearch集群的数据。

  1. 登录云搜索服务管理控制台。
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  4. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的全量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-all”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。集群信息的获取方式请参见获取Elasticsearch集群信息
      input{
           elasticsearch{
              # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
              # 访问源集群的用户名和密码,非安全集群无需配置。
              # user => "css_logstash"
              # password => "*****"
              # 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
              # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
              # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
              # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
              # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
              #ssl => true
           }
       }
      
       
      # 移除一些logstash增加的字段。
       filter {
         mutate {
           remove_field => ["@version"]
         }
       }
      
       output{
           elasticsearch{
             # 目标Elasticsearch集群的访问地址
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
             # 访问目标集群的用户名和密码,非安全集群无需配置。
             # user => "css_logstash"
             # password => "*****"
             # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
             # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
             # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
             #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
             # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
             #ssl => true
             # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
             #ssl_certificate_verification => false
           }
       }
      表4 全量迁移配置项说明

      配置项名称

      说明

      input

      hosts

      源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,例如“index*”

      docinfo

      是否重新索引文档,必须为true。

      slices

      配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。

      size

      每次查询返回的最大命中数。

      output

      hosts

      目标集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      迁移到目标集群的索引名称,支持扩展修改,如“Logstash-%{+yyyy.MM.dd}”

      document_type

      使目标端文档类型与源端保持一致。

      document_id

      索引中的文档ID,建议与源端保持一致,如需要自动生成,使用“#”注释掉即可。

    4. 编辑完成后,单击“下一页”配置Logstash配置文件运行参数。

      此示例保持默认值即可,如需设置请参见。

    5. 配置完成后,单击“创建”。

      在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

  5. 启动全量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-all”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash全量迁移任务。

      可以在管道列表看到启动的配置文件。

  6. 数据迁移完毕检查数据一致性。
    • 方式一:使用Putty登录迁移虚拟机,执行命令python checkIndices.py对比数据结果。
    • 方式二:分别在源集群和目标集群的Kibana执行命令GET _cat/indices,对比两者的索引信息是否一致。

使用Logstash增量迁移集群数据

在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据,该方法通过Logstash配置增量查询,仅支持迁移有增量字段的索引数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  4. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的增量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-inc”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。

      不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。集群信息的获取方式请参见获取Elasticsearch集群信息

      input{
           elasticsearch{
               # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
               hosts =>  ["xx.xx.xx.xx:9200"]
               # 访问源集群的用户名和密码,非安全集群无需配置。
               user => "css_logstash"
               password => "******"
               # 配置增量迁移索引。
               index => "*_202102"
               # 配置增量迁移查询语句。
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
               # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
               # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
               # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
               # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
               #ssl => true
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
               # 目标集群的访问地址。
               hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
               # 访问目标集群的用户名和密码,非安全集群无需配置。
               #user => "admin"
               #password => "******"
               # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
               # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
               # 配置集群HTTPS访问证书,CSS集群保持默认。      
               #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0 
               # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
               #ssl => true
               # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
               #ssl_certificate_verification => false
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }
      表5 增量迁移配置项说明

      配置

      说明

      hosts

      分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。

      query

      增量数据的识别标识,一般是Elasticsearch的DLS语句,需要提前分析。其中postsDate为业务中的时间字段。

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。

      scroll

      当源端数据量过大,为了防止Logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。

  5. 启动增量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-inc”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash增量迁移任务。

      可以在管道列表看到启动的配置文件。

  6. 数据迁移完毕检查数据一致性。
    • 方式一:使用Putty登录迁移虚拟机,执行命令python checkIndices.py对比数据结果。
    • 方式二:分别在源集群和目标集群的Kibana执行命令GET _cat/indices,对比两者的索引信息是否一致。

释放Logstash集群

当集群迁移完成后,请及时释放Logstash集群,可以节约资源,避免产生不必要的费用。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“更多 > 删除”,在弹出的确认提示框中,输入“DELETE”,单击“确定”完成集群删除。

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容