Updated on 2024-04-19 GMT+08:00

Using Logstash to Perform Full Data Migration

Logstash supports full data migration and incremental data migration. You can select full migration for the first time, and incremental migration for subsequent data migration. This section describes how to use Logstash of CSS to fully migrate cluster data.

Prepare for the migration by referring to Restrictions and Preparations. The procedure is as follows:

Restrictions

  • Logstash version restrictions:

    CSS supports clusters of versions 5.5.1, 6.3.2, 6.5.4, 7.1.1, 7.6.2, and 7.10.2. Ensure that the major versions of the clusters whose data you want to migrate are the same.

    If the Elasticsearch cluster version is 5.x, select Logstash 5.6.16. If the Elasticsearch cluster version is 7.x, select Logstash 7.10.0.

  • Do not modify indexes during cluster migration. Otherwise, the original data will be inconsistent with the migrated data.
  • If there is less than 100 GB indexes, separate index analysis is not required.

Preparations

  • Create a VM for data migration.
    1. Create a VM to migrate the metadata of the source cluster.
      1. Create a Linux ECS with 2 vCPUs and 4 GB memory.
      2. Run the curl http:// {IP_address}:{port} command to test the connectivity between the VM and the source cluster and between the VM and the destination cluster.

        IP_address indicates the access address of the source and destination clusters. Enter the actual port number of the cluster. The default port is 9200.

        The following example applies only to non-security clusters.

        curl http://10.234.73.128:9200
        {
          "name" : "voc_es_cluster_new-ess-esn-1-1",
          "cluster_name" : "voc_es_cluster_new",
          "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
          "version" : {
            "number" : "6.5.4",
            "build_flavor" : "default",
            "build_type" : "tar",
            "build_hash" : "d2ef93d",
            "build_date" : "2018-12-17T21:17:40.758843Z",
            "build_snapshot" : false,
            "lucene_version" : "7.5.0",
            "minimum_wire_compatibility_version" : "5.6.0",
            "minimum_index_compatibility_version" : "5.0.0"
          },
          "Tagline" : "You Know, for Search"
        }
  • Prepare the tools and software.

    The installation method is determined by whether the VM can be connected to the Internet. If VM can be connected to the Internet, use yum and pip to install the software. If VM cannot be connected to the Internet, download the installation package to the VM and run the installation commands.

    Table 1 Tools and software

    Type

    Purpose

    How to Obtain

    Python2

    Used to execute data migration scripts.

    Python2. Select Python 2.7.18.

    Winscp

    Cross-platform file transfer tool. Upload scripts to Linux.

    Winscp

    The online installation procedure is as follows:

    1. Run yum install python2 to install python2.
      [root@ecs opt]# yum install python2
    2. Run yum install python-pip to install pip.
      [root@ecs opt]# yum install python-pip
    3. Run pip install pyyaml to install the YAML dependency.
    4. Run pip install requests to install the requests dependency.

    The online installation procedure is as follows:

    1. Download the python2 installation package from https://www.python.org/downloads/release/python-2718/. Download and install the source code.
      Figure 1 Downloading the python2 package
    2. Use WinSCP to upload the Python installation package to the opt directory and install Python.
      # Decompress the Python package.
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      ...
      # After the decompression, go to the directory.
      [root@ecs-52bc opt]# cd Python-2.7.18
      # Check the file configuration installation path.
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      ...
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # Compile Python.
      [root@ecs-52bc Python-2.7.18]# make
      # Install Python.
      [root@ecs-52bc Python-2.7.18]# make install
    3. Check the Python installation result.
      # Check the Python version.
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # Check the pip version.
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  • Prepare the execution script.
    1. Run the vi migrateConfig.yaml command to add configuration files.
      es_cluster_new:
      # Cluster name
        clustername: es_cluster_new
        # Address of the source Elasticsearch cluster, with http:// at the beginning
        src_ip: http://x.x.x.x:9200
        # If there is no username, set the password to "".
        src_username: ""
        src_password: ""
        # Address of the target Elasticsearch cluster, with http:// at the beginning
        dest_ip: http://x.x.x.x:9200
        # If there is no username, set the password to "".
        dest_username: ""
        dest_password: ""
        #This parameter is optional. The default value is false. It is used by migrateMapping.py.
        # Indicates whether to process only the mapping address indexes in the file.
        # If this parameter is set to true, only the indexes in the following mappings are obtained and created in the target.
        # If this parameter is set to false, all the indexes of the source cluster are obtained, excluding the following: .kibana, .*
        # Match the index names with the following mappings. If an index name matches a mapping, use the value of the mapping as the index name in the target.
        # If no match is found, the original index name in the source is used.
        only_mapping: false
        # Index to be migrated. key indicates the index name in the source, and value indicates the index name in the target.
        mapping:
            test_index_1: test_index_1
      
        # This parameter is optional. The default value is false. It is used by checkIndices.py.
        # If this parameter is set to false, all indexes and the number of documents are compared. If this parameter is set to true, only the number of indexes is compared.
        only_compare_index: false

      Configuration file parameters

      Configuration

      Description

      clustername

      Cluster name

      src_ip

      IP address for accessing the source cluster. Only one IP address is required. The default port number is 9200. If the port number for accessing the cluster is not 9200, use the actual port number.

      src_username

      Username for accessing the source cluster. If this parameter is not required, set it to "".

      src_password

      Password for accessing the source cluster. If this parameter is not required, set it to "".

      dest_ip

      IP address for accessing the target cluster. Only one IP address is required. The default port number is 9200. If the port number for accessing the cluster is not 9200, use the actual port number.

      dest_username

      Username for accessing the target cluster. If this parameter is not required, set it to "".

      dest_password

      Password for accessing the target cluster. If this parameter is not required, set it to "".

    2. Run the vi python migrateTemplate.py command. Copy the following script to generate the index structure migration script:
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_tempalte_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_tempalte_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. Run the vi migrateMapping.py command. Copy the following script to generate the index structure migration script:
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. Run the vi checkIndices.py command. Copy the following script to generate the index data comparison script:
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if not os.path.exists("mappingLogs"):
                  os.makedirs("mappingLogs")
              filename = "mappingLogs/" + str(cluster_name) + "#indices_stats"
              with open(filename + ".json", "w") as f:
                  json.dump("cluster name: " + cluster_name, f)
                  f.write("\n")
                  json.dump("source indices: ", f)
                  f.write("\n")
                  json.dump(all_source_index, f, indent=4)
                  f.write("\n")
                  json.dump("destination indices : ", f)
                  f.write("\n")
                  json.dump(all_dest_index, f, indent=4)
                  f.write("\n")
      
              print("source indices total     : " + str(all_source_index.__len__()))
              print("destination index total  : " + str(all_dest_index.__len__()))
      
              filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping"
              filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping"
              with open(filename_src + ".json", "a") as f_src:
                  json.dump("cluster name: " + cluster_name, f_src)
                  f_src.write("\n")
              with open(filename_dest + ".json", "a") as f_dest:
                  json.dump("cluster name: " + cluster_name, f_dest)
                  f_dest.write("\n")
              for index in all_source_index:
                  mapping = get_mapping(source, source_auth, index)
                  with open(filename + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
                  with open(filename_src + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
                  mapping = get_mapping(dest, dest_auth, index)
                  with open(filename + ".json", "a") as f_dest:
                      json.dump("========================", f_dest)
                      f_dest.write("\n")
                      json.dump(mapping, f_dest, indent=4)
                      f_dest.write("\n")
                  with open(filename_dest + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
              print("source indices write file success,      file: " + filename_src)
              print("destination indices write file success, file: " + filename_dest)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)

Step 1: Creating a Logstash Cluster

  • Logstash clusters are used to migrate data. By default, Logstash clusters are charged in pay-per-use mode. After data migration is complete, you are advised to delete the Logstash cluster to save costs.
  • If there are multiple cluster indexes, you can create multiple Logstash clusters and configure different migration tasks for them.
  1. Log in to the CSS management console.
  2. On the Dashboard or Clusters page, choose Logstash in the navigation pane on the left.
  3. Click Create Cluster. The Create Cluster page is displayed.
  4. Specify Region and AZ.
  5. Specify the basic cluster information, select the cluster type and cluster version, and enter the cluster name.
    Table 2 Basic parameters

    Parameter

    Description

    Cluster Type

    Select Logstash.

    Version

    5.6.16 and 7.10.0 are supported.

    If the Elasticsearch cluster version is 5.x, select Logstash 5.6.16. If the Elasticsearch cluster version is 7.x, select Logstash 7.10.0.

    Name

    Cluster name, which contains 4 to 32 characters. Only letters, numbers, hyphens (-), and underscores (_) are allowed and the value must start with a letter.

    Figure 2 Configuring basic information
  6. Set host specifications of the cluster. Set the number of Nodes to 1. Set Node Specifications to 8 vCPUs | 16 GB and retain the default values for other parameters.
    Figure 3 Configuring host specifications
  7. Set the enterprise project. Retain the default value.
  8. Click Next: Configure Network. Configure the cluster network.
    Table 3 Parameter description

    Parameter

    Description

    VPC

    A VPC is a secure, isolated, and logical network environment.

    Select the target VPC. Click View VPC to enter the VPC management console and view the created VPC names and IDs. If no VPCs are available, create one.

    NOTE:

    The VPC must contain CIDRs. Otherwise, cluster creation will fail. By default, a VPC will contain CIDRs.

    Subnet

    A subnet provides dedicated network resources that are isolated from other networks, improving network security.

    Select the target subnet. You can access the VPC management console to view the names and IDs of the existing subnets in the VPC.

    Security Group

    A security group implements access control for ECSs that have the same security protection requirements in a VPC. To view more details about the security group, click View Security Group.

    NOTE:

    Ensure that Port Range/ICMP Type is Any or a port range includes port 9200 for the selected security group.

    Figure 4 Configuring network specifications
  9. Click Next: Configure Advanced Settings. You can select Default or Custom for Advanced Settings. Retain the default settings in this example.
  10. Click Next: Confirm. Check the configuration and click Next to create a cluster.
  11. Click Back to Cluster List to switch to the Clusters page. The cluster you created is listed on the displayed page and its status is Creating. If the cluster is successfully created, its status will change to Available.

Step 2: Verifying Cluster Connectivity

Verify the connectivity between Logstash and the source and destination clusters.

  1. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center in the navigation pane on the left to go to the configuration center page. Alternatively, click Configuration Center in the Operation column of the target cluster to go to the configuration center page.
  2. On the Configuration Center page, click Test Connectivity.
  3. Enter the IP addresses or domain names and port numbers of the source and destination clusters, and click Test.
    Figure 5 Testing the connectivity

Step 3: Configuring a Logstash Full Data Migration Task

  1. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center, or click Configuration Center in the Operation column of the target cluster. The Configuration Center page is displayed.
  2. Click Create in the upper right corner. On the configuration file creation page that is displayed, select a cluster template and modify the migration configuration file of the Elasticsearch cluster.

    In this example, HTTPS is not enabled for the two Elasticsearch clusters.

    • Select a cluster template: In this example, data is imported from an Elasticsearch cluster to an Elasticsearch cluster. Locate the elasticsearch row and click Apply in the Operation column. Add configuration information based on different cluster configurations.
    • Modify the configuration file. Specify the configuration name, for example, es-es-all. Specify the migration configuration file of the Elasticsearch cluster. The following is an example of the configuration file:
      input{
           elasticsearch{
              # IP address of the source cluster
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
              # Username and password for accessing the source cluster. Leave them blank if there is no username or password.
              # user => "css_logstash"
              # password => "*****"
               # Information about the indexes to be migrated
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
           }
       }
      
       
      # Remove specified fields added by Logstash.
       filter {
         mutate {
           remove_field => ["@metadata", "@version"]
         }
       }
      
       output{
           elasticsearch{
             # IP address of the destination cluster
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
              # Username and password for accessing the destination cluster. Leave them blank if there is no username or password.
             # user => "css_logstash"
             # password => "*****"
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
           }
       }

      Modify the following configurations:

      Table 4 Modification of cluster configurations

      Configuration

      Description

      input

      hosts

      IP address of the source cluster. If the cluster has multiple access nodes, separate them with commas (,).

      user

      Username for accessing the cluster. If there are no usernames, use the number sign (#) to comment out this item.

      password

      Password for accessing the cluster. If there are no usernames or passwords, use the number sign (#) to comment out this item.

      index

      The source indexes to be fully migrated. Use commas (,) to separate multiple indexes. Wildcard is supported, for example, index*.

      docinfo

      Indicates whether to re-index the document. The value must be true.

      slices

      In some cases, it is possible to improve overall throughput by consuming multiple distinct slices of a query simultaneously using sliced scrolls. It is recommended that the value range from 2 to 8.

      size

      Maximum number of hits returned for each query

      output

      hosts

      IP address for accessing the Huawei Cloud CSS cluster. If the cluster has multiple nodes, separate them with commas (,).

      user

      Username for accessing the cluster. If there are no usernames, use the number sign (#) to comment out this item.

      password

      Password for accessing the cluster. If there are no passwords, use the number sign (#) to comment out this item.

      index

      Name of the index migrated to the destination cluster. It can be modified and expanded, for example, logstash- %{+yyyy.MM.dd}.

      document_type

      Ensure that the document type on the destination end is the same as that on the source end.

      document_id

      Document ID in the index. It is recommended that the document ID be the same as that on the source end. If you need to automatically generate the document ID, use the number sign (#) to comment it out.

Step 4: Performing a Full Data Migration

  1. Use PuTTY to log in to the Linux VM created in Preparations.
  2. Run the python migrateTemplate.py command to migrate the index template.
  3. Run the python migrateMapping.py command to migrate indexes.
  4. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center, or click Configuration Center in the Operation column of the target cluster. The Configuration Center page is displayed.
  5. Select the configuration file created in section Step 3: Configuring a Logstash Full Data Migration Task and click Start in the upper left corner.
  6. Determine whether to start data migration immediately once the Logstash service is started.
  7. If you enable Logstash, you can view the startup configuration file under the pipe.
  8. After the data migration is complete, use PuTTY to log in to the Linux VM and run the python checkIndices.py command to compare the data.

Step 5: Deleting the Logstash Cluster

After the cluster migration is complete, delete the Logstash cluster.

  1. Log in to the CSS management console.
  2. Choose Clusters > Logstash. On the displayed page, locate the row that contains the target cluster and click More > Delete in the Operation column.
  3. In the displayed dialog box, enter the name of the cluster to be deleted again and click OK.