文档首页/ 云搜索服务 CSS/ 最佳实践/ ELasticsearch数据迁移/ 通过华为云Logstash实现Elasticsearch集群间数据迁移
更新时间:2024-11-18 GMT+08:00
分享

通过华为云Logstash实现Elasticsearch集群间数据迁移

使用华为云CSS服务的Logstash集群可以实现Elasticsearch集群间的数据迁移。

应用场景

华为云Logstash是一款全托管的数据接入处理服务,兼容开源Logstash的能力,支持用于Elasticsearch集群间数据迁移。

通过华为云Logstash可以实现华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch迁移至华为云Elasticsearch,该方案常用于以下场景:

  • 跨版本迁移:利用Logstash的兼容性和灵活性,实现不同版本间的数据迁移,确保数据在新版本中的可用性和一致性。适用于Elasticsearch集群版本跨度较大的迁移场景,例如从6.X版本迁移至7.X版本。
  • 集群合并:使用Logstash进行数据迁移,将多个Elasticsearch集群的数据整合到一个Elasticsearch集群中,实现多个Elasticsearch数据的统一管理和分析。
  • 服务迁移上云:将自建的Elasticsearch服务迁移到云平台,以利用云服务的可扩展性、维护简便性和成本效益。
  • 变更服务提供商:如果企业当前使用的是第三方Elasticsearch服务,但出于成本、性能或其他战略考虑,希望更换服务提供商至华为云。

方案架构

图1 迁移流程

通过华为云Logstash实现Elasticsearch集群间数据迁移的迁移流程如图1所示。

  1. 输入(Input):华为云Logstash接收来自华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch的数据。

    华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch数据迁移到华为云Elasticsearch的操作步骤相同,只是获取源集群的访问地址有差异,具体请参见获取Elasticsearch集群信息

  2. 过滤(Filter):华为云Logstash对数据进行清洗和转换。
  3. 输出(Output):华为云Logstash将数据输出到目标设备,如华为云Elasticsearch。
根据业务需求,可以选择全量数据迁移或增量数据迁移。
  • 全量数据迁移:使用Logstash进行全量数据迁移,适用于迁移初期或需要确保数据完整性的场景。
  • 增量数据迁移:通过Logstash配置增量查询,可以只迁移有增量字段的索引数据。此方法适用于需要持续同步数据或对数据实时性有较高要求的场景。

方案优势

  • 高版本兼容性:适用于不同版本的Elasticsearch集群迁移。
  • 高效的数据处理能力:Logstash支持批量读写操作,可以大幅度提高数据迁移的效率。
  • 并发同步技术:利用slice并发同步技术,可以提高数据迁移的速度和性能,尤其是在处理大规模数据时。
  • 配置简单:华为云Logstash的配置相对简单直观,通过配置文件即可实现数据的输入、处理和输出。
  • 强大的数据处理功能:Logstash内置了丰富的过滤器,可以在迁移过程中对数据进行清洗、转换和丰富。
  • 灵活的迁移策略:根据业务需求,可以灵活选择全量迁移或增量迁移,优化存储使用和迁移时间。

性能影响

使用Logstash迁移集群依托于Scroll API,此API能够高效读取源集群的索引数据,并批量同步至目标集群。这一过程可能会对源集群性能产生影响,具体影响程度取决于目标集群对源集群的读取速度,而读取速度取决于Scroll API的size和slice参数配置。参数配置的详细指导可参考Reindex API文档。
  • 对于资源消耗较高的集群,建议通过调整size参数来减缓迁移速率,或者选择在业务流量低谷时段进行迁移操作,以减轻对集群资源的影响。
  • 对于资源消耗较低的集群,在迁移时可以采用默认参数配置,建议同时监控源集群的性能负载,并根据实际情况适时调整size和slice参数,以优化迁移效率和资源使用。

约束限制

集群迁移过程中,源集群的索引数据不能增删改,否则会导致迁移后的源集群数据和目标集群数据内容不一致。

前提条件

  • 源Elasticsearch集群和目标Elasticsearch集群处于可用状态。
  • 集群间需要保证网络连通。
    • 如果源集群、Logstash和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介
    • 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
    • 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
  • 确认集群的索引已开启“_source”

    集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。

操作步骤

  1. 获取Elasticsearch集群信息
  2. 准备迁移环境:创建ECS并准备必要的迁移工具和脚本。
  3. 创建Logstash集群:创建一个Logstash集群用于迁移数据。
  4. 验证集群间的网络连通性:验证Logstash和源Elasticsearch集群的连通性。
  5. 使用Logstash迁移集群
  6. 释放Logstash集群:当集群迁移完成后,请及时释放Logstash集群。

获取Elasticsearch集群信息

在迁移集群前,需要先获取必备的集群信息,用于配置迁移任务。

表1 需要获取的Elasticsearch集群信息

集群来源

要获取的信息

获取方式

源集群

华为云Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)
  • 获取集群名称和访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

自建Elasticsearch集群

  • 源集群的名称
  • 源集群的公网访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

第三方Elasticsearch集群

  • 源集群的名称
  • 源集群的访问地址
  • 访问源集群的用户名和密码(仅安全集群涉及)

联系服务管理员获取。

目标集群

华为云Elasticsearch集群

  • 目标集群的访问地址
  • 访问目标集群的用户名和密码(仅安全集群涉及)
  • 获取访问地址请参见3
  • 用户名和密码请联系服务管理员获取。

源集群的来源不同,获取信息的方式不同,此处仅介绍如何获取华为云Elasticsearch集群的信息。

  1. 登录云搜索服务管理控制台。
  2. 在左侧菜单栏选择集群管理 > Elasticsearch
  3. 在Elasticsearch集群列表,获取集群名称和访问地址。
    图2 获取集群信息

准备迁移环境

创建ECS并准备必要的迁移工具和脚本。

  1. 创建弹性云服务器ECS,用于迁移源集群的元数据。
    1. 创建弹性云服务器ECS,ECS的操作系统选择CentOS,规格选择2U4G,且和CSS服务的集群在同一个虚拟私有云和安全组中。
    2. 测试ECS和源集群、目标集群的连通性。

      在ECS执行命令curl http:// {ip}:{port}测试连通性,当返回200时,则表示已经连通。

      IP是源集群和目标集群访问地址;port是端口号,默认是9200,请以集群实际端口号为准。

      curl http://10.234.73.128:9200 # 输入实际的IP地址,此处仅以非安全集群举例。
      {
        "name" : "es_cluster_migrate-ess-esn-1-1",
        "cluster_name" : "es_cluster_migrate",
        "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
        "version" : {
          "number" : "6.5.4",
          "build_flavor" : "default",
          "build_type" : "tar",
          "build_hash" : "d2ef93d",
          "build_date" : "2018-12-17T21:17:40.758843Z",
          "build_snapshot" : false,
          "lucene_version" : "7.5.0",
          "minimum_wire_compatibility_version" : "5.6.0",
          "minimum_index_compatibility_version" : "5.0.0"
        },
        "Tagline" : "You Know, for Search"
      }
  2. 准备工具和软件,判断ECS是否可以联网,选择不同的安装方式。
    • 是,选择在线安装工具和软件,直接使用yum和pip安装,具体请参见3
    • 否,选择离线安装工具和软件,下载安装包到虚拟机上执行安装命令,具体请参见4
    表2 准备工具和软件

    类型

    目的

    获取位置

    Python2

    主要用户执行迁移脚本。

    下载地址,版本选择python 2.7.18。

    winscp

    Linux上传脚本,跨平台文件传输工具。

    下载Winscp

  3. 在线安装工具和软件。
    1. 执行yum install python2安装python2。
      [root@ecs opt]# yum install python2
    2. 执行yum install python-pip安装pip。
      [root@ecs opt]# yum install python-pip
    3. 执行pip install pyyaml安装yaml依赖。
    4. 执行pip install requests安装requests依赖。
  4. 离线安装工具和软件。
    1. 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
      图3 下载python2安装包
    2. 使用WinSCP工具上传Python安装包到opt目录下,安装python。
      # 解压python压缩包
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      …
      # 解压完成进入目录
      [root@ecs-52bc opt]# cd Python-2.7.18
      # 检查文件配置安装路径
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      …
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # 编译python
      [root@ecs-52bc Python-2.7.18]# make
      # 安装python
      [root@ecs-52bc Python-2.7.18]# make install
    3. 安装完成检查,检查python安装结果。
      # 检查python版本
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # 检查pip版本
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  5. 准备Elasticsearch集群的索引迁移脚本。
    1. 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息
      es_cluster_new:
        # 源集群的名称
        clustername: es_cluster_new
        # 源Elasticsearch集群的访问地址,加上“http://”。
        src_ip: http://x.x.x.x:9200
        # 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        src_username: ""
        src_password: ""
        # 目标Elasticsearch集群的访问地址,加上“http://”。
        dest_ip: http://x.x.x.x:9200
        # 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。
        dest_username: ""
        dest_password: ""
        # “only_mapping”可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;当设置成false时,则迁移源集群中除“.kibana”“.*”之外的所有索引数据。
        # 迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。
        only_mapping: false
        # 设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。
        mapping:
            test_index_1: test_index_1
        # “only_compare_index”可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。
        only_compare_index: false
    2. 执行vi migrateTemplate.py命令,直接复制输入以下内容无需修改,执行wq保存为索引模板迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_template_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. 执行vi migrateMapping.py命令,直接复制输入以下内容无需修改,执行wq保存为索引结构迁移脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. 执行vi checkIndices.py命令,直接复制输入以下内容无需修改,执行wq保存为索引数据对比脚本
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)

创建Logstash集群

当ECS中的迁移环境准备完成后,在CSS服务创建一个Logstash集群用于迁移数据。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 单击右上角的创建集群,进入创建集群页面。
  4. 在创建集群页面,根据指导完成集群配置。

    关键配置参数请参见表3,其他参数可以保持默认值。创建集群的详细说明请参见创建Logstash集群

    表3 Logstash集群的关键配置

    参数

    说明

    计费模式

    选择“按需计费”,按实际使用时长计费,计费周期为一小时,不足一小时按一小时计费。

    集群类型

    选择“Logstash”

    集群版本

    选择“7.10.0”

    集群名称

    自定义集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。

    此处以“Logstash-ES”为例。

    虚拟私有云

    VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

    此处选择和目标Elasticsearch集群同一个虚拟私有云(VPC)。

    子网

    通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。

    选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。

    安全组

    安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。

    此处选择和目标Elasticsearch集群同一个安全组。

  5. 单击“下一步:高级配置”,此配置保持默认配置即可。
  6. 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
  7. 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为可用

验证集群间的网络连通性

在启动迁移任务前,需要先验证Logstash和源Elasticsearch集群的网络连通性。

  1. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  2. 在配置中心页面,单击“连通性测试”
  3. 在弹窗中输入源集群的IP地址和端口号,单击“测试”
    图4 连通性测试

    当显示“可用”时,表示集群间网络连通。

使用Logstash全量迁移集群数据

在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据,该方法会一次迁移整个Elasticsearch集群的数据。

  1. 使用Putty登录准备迁移环境中创建的Linux虚拟机。
  2. 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
  3. 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
  4. 登录云搜索服务管理控制台
  5. 在左侧菜单栏选择集群管理 > Logstash
  6. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  7. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的全量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-all”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。集群信息的获取方式请参见获取Elasticsearch集群信息
      input{
           elasticsearch{
              # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
              # 访问源集群的用户名和密码,非安全集群无需配置。
              # user => "css_logstash"
              # password => "*****"
              # 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
              # 当源集群是HTTPS访问方式时,则设置ssl => false。
              # ssl => false
           }
       }
      
       
      # 移除一些logstash增加的字段。
       filter {
         mutate {
           remove_field => ["@metadata", "@version"]
         }
       }
      
       output{
           elasticsearch{
             # 目标Elasticsearch集群的访问地址
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
             # 访问目标集群的用户名和密码,非安全集群无需配置。
             # user => "css_logstash"
             # password => "*****"
             # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
             # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
             # 配置集群HTTPS访问证书,CSS集群保持以下不变。     
             #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs/CloudSearchService.cer" 
             # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
             #ssl => true
             # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
             #ssl_certificate_verification => false
           }
       }
      表4 全量迁移配置项说明

      配置项名称

      说明

      input

      hosts

      源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,例如“index*”

      docinfo

      是否重新索引文档,必须为true。

      slices

      配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。

      size

      每次查询返回的最大命中数。

      output

      hosts

      目标集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      迁移到目标集群的索引名称,支持扩展修改,如“Logstash-%{+yyyy.MM.dd}”

      document_type

      使目标端文档类型与源端保持一致。

      document_id

      索引中的文档ID,建议与源端保持一致,如需要自动生成,使用“#”注释掉即可。

    4. 编辑完成后,单击“下一页”配置Logstash配置文件运行参数。

      此示例保持默认值即可,如需设置请参见创建Logstash配置文件

    5. 配置完成后,单击“创建”。

      在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

  8. 启动全量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-all”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash全量迁移任务。

      可以在管道列表看到启动的配置文件。

  9. 数据迁移完毕检查数据一致性。

    使用Putty登录linux虚拟机,执行命令python checkIndices.py对比数据结果。

使用Logstash增量迁移集群数据

在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据,该方法通过Logstash配置增量查询,仅支持迁移有增量字段的索引数据。

  1. 使用Putty登录准备迁移环境中创建的Linux虚拟机。
  2. 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
  3. 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
  4. 登录云搜索服务管理控制台
  5. 在左侧菜单栏选择集群管理 > Logstash
  6. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
  7. 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的增量迁移配置文件。
    1. 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”
    2. 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-inc”
    3. 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。

      不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。集群信息的获取方式请参见获取Elasticsearch集群信息

      input{
           elasticsearch{
               # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。
               hosts =>  ["xx.xx.xx.xx:9200"]
               # 访问源集群的用户名和密码,非安全集群无需配置。
               user => "css_logstash"
               password => "******"
               # 配置增量迁移索引。
               index => "*_202102"
               # 配置增量迁移查询语句。
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
               # 当源集群是HTTPS访问方式时,则设置ssl => false。
               # ssl => false
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
               # 目标集群的访问地址。
               hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
               # 访问目标集群的用户名和密码,非安全集群无需配置。
               #user => "admin"
               #password => "******"
               # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
               # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。
               # 配置集群HTTPS访问证书,CSS集群保持默认。      
               #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs/CloudSearchService.cer" 
               # 是否开启HTTPS通信,HTTPS访问集群则设置为true。
               #ssl => true
               # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。
               #ssl_certificate_verification => false
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }
      表5 增量迁移配置项说明

      配置

      说明

      hosts

      分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。

      user

      访问集群的用户名,如果是非安全集群此项使用“#”注释掉。

      password

      访问集群的密码,如果是非安全集群此项使用“#”注释掉。

      index

      需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。

      query

      增量数据的识别标识,一般是Elasticsearch的DLS语句,需要提前分析。其中postsDate为业务中的时间字段。

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。

      scroll

      当源端数据量过大,为了防止Logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。

  8. 启动增量迁移任务。
    1. 在配置文件列表,选择配置文件“es-es-inc”,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。

      开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。

    3. 单击“确定”,开始启动配置文件启动Logstash增量迁移任务。

      可以在管道列表看到启动的配置文件。

  9. 数据迁移完毕检查数据一致性。

    使用Putty登录linux虚拟机,执行命令python checkIndices.py对比数据结果。

释放Logstash集群

当集群迁移完成后,请及时释放Logstash集群,可以节约资源,避免产生不必要的费用。

  1. 登录云搜索服务管理控制台
  2. 在左侧菜单栏选择集群管理 > Logstash
  3. 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“更多 > 删除”,在弹出的确认提示框中,输入“DELETE”,单击“确定”完成集群删除。

相关文档