文档首页> 云搜索服务 CSS> 最佳实践> 迁移集群> 源端为Elasticsearch> 使用云服务Logstash全量迁移集群数据
更新时间:2024-06-17 GMT+08:00
分享

使用云服务Logstash全量迁移集群数据

Logstash支持全量迁移和增量迁移,首次迁移使用全量迁移,后续增加数据选择增量迁移。本章节介绍如何使用CSS服务的Logstash全量迁移集群数据。

首先请根据约束和限制准备工作提前完成迁移前的准备工作。具体步骤如下所示:

约束和限制

  • Logstash版本约束:

    CSS 支持5.5.1,6.3.2,6.5.4,7.1.1,7.6.2,7.10.2多个版本,迁移集群尽量保持大版本一致。

    对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。

  • 集群迁移过程禁止修改索引,修改索引会导致原数据和目标数据内容不一致。
  • 索引大小小于100G可以使用迁移任务不用单独分析索引,简化分析工作。

准备工作

  • 创建迁移虚拟机。
    创建迁移虚拟机,用于迁移源集群的元数据。
    1. 创建ECS虚拟机,虚拟机需要创建linux系统,规格选择2U4G。
    2. 测试虚拟机和源集群和目标集群保持连通性,执行命令curl http:// {ip}:{port}可以测试结果。

      IP是源集群和目的集群访问地址,端口默认是9200,如果不是9200使用集群实际端口。

      如下示例仅适用于非安全集群。

      curl http://10.234.73.128:9200
      {
        "name" : "voc_es_cluster_new-ess-esn-1-1",
        "cluster_name" : "voc_es_cluster_new",
        "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
        "version" : {
          "number" : "6.5.4",
          "build_flavor" : "default",
          "build_type" : "tar",
          "build_hash" : "d2ef93d",
          "build_date" : "2018-12-17T21:17:40.758843Z",
          "build_snapshot" : false,
          "lucene_version" : "7.5.0",
          "minimum_wire_compatibility_version" : "5.6.0",
          "minimum_index_compatibility_version" : "5.0.0"
        },
        "Tagline" : "You Know, for Search"
      }
  • 准备工具和软件

    在线安装和离线安装以虚拟机是否可以联网判断。如果可以联网直接使用yum和pip安装即可。离线安装需要下载安装包到虚拟机上执行安装命令。

    表1 准备工具和软件

    类型

    目的

    获取位置

    Python2

    主要用户执行迁移脚本。

    下载地址,版本选择python 2.7.18。

    winscp

    Linux上传脚本,跨平台文件传输工具。

    下载Winscp

    在线安装步骤如下:

    1. 执行yum install python2安装python2。
      [root@ecs opt]# yum install python2
    2. 执行yum install python-pip安装pip。
      [root@ecs opt]# yum install python-pip
    3. 执行pip install pyyaml安装yaml依赖。
    4. 执行pip install requests安装requests依赖。

    离线安装步骤如下:

    1. 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
      图1 下载python2安装包
    2. 使用winscp工具上传python安装包到opt目录下,安装python。
      # 解压python压缩包
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      …
      # 解压完成进入目录
      [root@ecs-52bc opt]# cd Python-2.7.18
      # 检查文件配置安装路径
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      …
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # 编译python
      [root@ecs-52bc Python-2.7.18]# make
      # 安装python
      [root@ecs-52bc Python-2.7.18]# make install
    3. 安装完成检查,检查python安装结果。
      # 检查python版本
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # 检查pip版本
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  • 准备执行脚本
    1. vi migrateConfig.yaml增加配置文件。
      es_cluster_new:
      # 集群名称
        clustername: es_cluster_new
        # 源端ES集群地址,加上http://,
        src_ip: http://x.x.x.x:9200
        # 没有用户名密码设置为""
        src_username: ""
        src_password: ""
        # 目的端ES集群地址,加上http://
        dest_ip: http://x.x.x.x:9200
        # 没有用户名密码设置为""
        dest_username: ""
        dest_password: ""
        # 可有不定义,默认为false, migrateMapping.py使用
        # 是否只处理这个文件中mapping地址的索引
        # 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
        # 如果设置成false,则会取源端集群的所有索引,除去(.kibana, .*)
        # 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
        # 如果匹配不到,则使用源端原始的索引名称
        only_mapping: false
        # 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
        mapping:
            test_index_1: test_index_1
      
        # 可以不定义,默认false, checkIndices.py使用
        # 设置为false会比较所有的索引和文档数量,设置为true只比较索引数量,
        only_compare_index: false

      配置文件说明:

      配置

      说明

      clustername

      集群名称,集群标识。

      src_ip

      源集群访问的ip地址,只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。

      src_username

      源集群访问的用户名,如果不需要设置为""。

      src_password

      源集群访问的密码,如果不需要设置为""。

      dest_ip

      目标集群访问的ip地址, 只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。

      dest_username

      目标集群访问的用户名,如果不需要设置为""。

      dest_password

      目标集群访问的密码,如果不需要设置为""。

    2. 执行vi migrateTemplate.py命令,复制以下脚本到文件生成索引结构迁移脚本。
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_tempalte_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_tempalte_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. 执行vi migrateMapping.py命令,复制以下脚本到文件生成索引结构迁移脚本。
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. 执行vi checkIndices.py命令,复制以下脚本到文件生成索引数据对比脚本。
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if not os.path.exists("mappingLogs"):
                  os.makedirs("mappingLogs")
              filename = "mappingLogs/" + str(cluster_name) + "#indices_stats"
              with open(filename + ".json", "w") as f:
                  json.dump("cluster name: " + cluster_name, f)
                  f.write("\n")
                  json.dump("source indices: ", f)
                  f.write("\n")
                  json.dump(all_source_index, f, indent=4)
                  f.write("\n")
                  json.dump("destination indices : ", f)
                  f.write("\n")
                  json.dump(all_dest_index, f, indent=4)
                  f.write("\n")
      
              print("source indices total     : " + str(all_source_index.__len__()))
              print("destination index total  : " + str(all_dest_index.__len__()))
      
              filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping"
              filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping"
              with open(filename_src + ".json", "a") as f_src:
                  json.dump("cluster name: " + cluster_name, f_src)
                  f_src.write("\n")
              with open(filename_dest + ".json", "a") as f_dest:
                  json.dump("cluster name: " + cluster_name, f_dest)
                  f_dest.write("\n")
              for index in all_source_index:
                  mapping = get_mapping(source, source_auth, index)
                  with open(filename + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
                  with open(filename_src + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
                  mapping = get_mapping(dest, dest_auth, index)
                  with open(filename + ".json", "a") as f_dest:
                      json.dump("========================", f_dest)
                      f_dest.write("\n")
                      json.dump(mapping, f_dest, indent=4)
                      f_dest.write("\n")
                  with open(filename_dest + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
              print("source indices write file success,      file: " + filename_src)
              print("destination indices write file success, file: " + filename_dest)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)

步骤一:创建Logstash集群

  • 迁移数据使用Logstash,创建logstash服务需要费用,默认是按需收费,用户迁移完毕数据及时释放Logstash节省费用。
  • 可以基于集群的索引不同创建多个Logstash集群分别配置不同的迁移任务。
  1. 登录云搜索服务管理控制台
  2. “总览”或者“集群管理”页面,选择“Logstash”,进入Logsash类型集群管理页面。
  3. 单击创建集群,进入“创建集群”页面。
  4. 选择“当前区域“可用区”
  5. 指定集群基本信息,选择“集群类型”“集群版本”,并输入“集群名称”
    表2 基本参数说明

    参数

    说明

    集群类型

    选择“Logstash”

    集群版本

    当前支持5.6.16、7.10.0。

    对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。

    集群名称

    自定义的集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。

    图2 基本信息配置
  6. 指定集群的主机规格相关参数。“节点数量”设置为“1”“节点规格”选择“8U16G”,其余参数保持默认值。
    图3 设置主机规格
  7. 设置集群的企业项目,保持默认值即可。
  8. 单击“下一步,网络配置”,设置集群的网络配置。
    表3 参数说明

    参数

    说明

    虚拟私有云

    VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

    选择创建集群需要的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。

    说明:

    此处您选择的VPC必须包含网段(CIDR),否则集群将无法创建成功。新建的VPC默认包含网段(CIDR)。

    子网

    通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。

    选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。

    安全组

    安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。单击“查看安全组”可了解安全组详情。

    说明:

    请确保安全组的“端口范围/ICMP类型”“Any”或者包含端口9200的端口范围。

    图4 设置网络规格
  9. 单击“下一步:高级配置”,高级配置可选择默认配置和自定义。此样例保持默认配置即可。
  10. 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
  11. 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为可用

步骤二:验证集群连通性

验证Logstash和源集群、目标集群的连通性。

  1. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,进入配置中心页面;或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  2. 在配置中心页面,选择“连通性测试”
  3. 输入源集群和目的集群的IP地址或域名和端口号,单击“测试”
    图5 连通性测试

步骤三:配置Logstash全量迁移任务

  1. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  2. 单击右上角“创建”,进入创建配置文件页面,选择集群模板,修改ES集群迁移配置文件。

    当前案例选用的两个ES集群均未开启https。

    • 选择集群模板:此样例是从Elasticsearch类型集群导入数据到Elasticsearch类型,集群模板选择“elasticsearch”,单击操作列的“应用”。基于不同的集群配置增加配置信息。
    • 修改配置文件:“名称”填写配置名称,例如es-es-all;“配置文件内容”填写ES集群迁移配置文件,配置文件示例如下。
      input{
           elasticsearch{
              # 源集群访问地址信息
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
              # 源集群访问的用户名和密码,没有user和password不填
              # user => "css_logstash"
              # password => "*****"
               # 待迁移的索引信息
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
           }
       }
      
       
      # 移除一些logstash增加的字段
       filter {
         mutate {
           remove_field => ["@metadata", "@version"]
         }
       }
      
       output{
           elasticsearch{
             # 目标集群访问地址信息
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
             # 目标集群访问的用户名和密码,没有user和password不填
             # user => "css_logstash"
             # password => "*****"
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
           }
       }

      需要修改的配置如下:

      表4 集群配置修改说明

      配置

      说明

      input

      hosts

      源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开

      user

      集群访问的用户名,如果没有用户名此项使用#注释掉

      password

      集群访问的密码,如果没有用户名密码此项使用#注释掉

      index

      需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,index*

      docinfo

      是否重新索引文档,必须为true

      slices

      配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内

      size

      每次查询返回的最大命中数

      output

      hosts

      华为云CSS集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开

      user

      集群访问的用户名,如果没有用户名此项使用 # 注释掉

      password

      集群访问的密码,如果没有密码此项使用 # 注释掉

      index

      迁移到目的端的索引名称,支持扩展修改,如:logstash-%{+yyyy.MM.dd}

      document_type

      使目标端文档类型与源端保持一致

      document_id

      索引中的文档ID,建议与源端保持一致,如需要自动生成,使用 #注释掉即可

步骤四:全量迁移

  1. 使用putty登录准备工作中创建的Linux虚拟机。
  2. 执行python migrateTemplate.py 迁移索引模板。
  3. 执行 python migrateMapping.py迁移索引。
  4. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  5. 选择步骤三:配置Logstash全量迁移任务中所创建的配置文件,单击左上角的“启动”
  6. 根据界面提示,选择是否启动Logstash服务会立刻开始迁移数据。
  7. 如果选择“是”,则可以在管道下面看到启动的配置文件。
  8. 数据迁移完毕检查数据一致性,使用putty登录linux虚拟机,执行python checkIndices.py 对比数据结果。

步骤五:释放Logstash集群

集群迁移完毕,释放Logstash集群。

  1. 登录云搜索服务管理控制台。
  2. 单击“集群管理”>“Logstash”,进入Logstash类型的集群列表界面,在所创建的迁移集群“操作”列中单击更多>删除
  3. 在弹出的确认提示框中,再次输入需要删除的集群名称,单击“确定”完成集群删除。
分享:

    相关文档

    相关产品