Help Center/ Cloud Search Service/ Best Practices/ Elasticsearch Data Migration/ Migrating Data Between Elasticsearch Clusters Using Huawei Cloud Logstash
Updated on 2025-01-06 GMT+08:00

Migrating Data Between Elasticsearch Clusters Using Huawei Cloud Logstash

You can use a Logstash cluster created using Huawei Cloud's Cloud Search Service (CSS) to migrate data between Elasticsearch clusters.

Scenarios

Huawei Cloud Logstash is a fully managed data ingestion and processing service. It is compatible with open-source Logstash and can be used for data migration between Elasticsearch clusters.

You can use Huawei Cloud Logstash to migrate data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch to Huawei Cloud Elasticsearch. This solution applies to the following scenarios:

  • Cross-version migration: Migrate data between different versions to maintain data availability and consistency in the new version, leveraging the compatibility and flexibility of Logstash. This mode applies to migration scenarios where the Elasticsearch cluster version span is large, for example, from 6.X to 7.X.
  • Cluster merging: Utilize Logstash to transfer and consolidate data from multiple Elasticsearch clusters into a single Elasticsearch cluster, enabling unified management and analysis of data from different Elasticsearch clusters.
  • Cloud migration: Migrate an on-premises Elasticsearch service to the cloud to enjoy the benefits of cloud services, such as scalability, ease-of-maintenance, and cost-effectiveness.
  • Changing the service provider: An enterprise currently using a third-party Elasticsearch service wishes to switch to Huawei Cloud for reasons such as cost, performance, or other strategic considerations.

Solution Architecture

Figure 1 Migration process

Figure 1 shows how to migrate data between Elasticsearch clusters using Huawei Cloud Logstash.

  1. Input: Huawei Cloud Logstash receives data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch.

    The procedure for migrating data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch to Huawei Cloud Elasticsearch remains the same. The only difference is the access address of the source cluster. For details, see Obtaining Elasticsearch Cluster Information.

  2. Filter: Huawei Cloud Logstash cleanses and converts data.
  3. Output: Huawei Cloud Logstash outputs data to the destination system, for example, Huawei Cloud Elasticsearch.
You can choose between full data migration or incremental data migration based on service requirements.
  • Full data migration: Utilize Logstash to perform a complete data migration. This method is suitable for the initial phase of migration or scenarios where data integrity must be ensured.
  • Incremental data migration: Configure Logstash to perform incremental queries and migrate only index data with incremental fields. This method is suitable for scenarios requiring continuous data synchronization or real-time data updates.

Advantages

  • Compatibility with later versions: This solution supports the migration of Elasticsearch clusters across different versions.
  • Efficient data processing: Logstash supports batch read and write operations, significantly enhancing data migration efficiency.
  • Concurrent synchronization technology: The slice concurrent synchronization technology can be utilized to boost data migration speed and performance, especially when handling large volumes of data.
  • Simple configuration: Huawei Cloud Logstash offers a straightforward and intuitive configuration process, allowing you to input, process, and output data through configuration files.
  • Powerful data processing: Logstash includes various built-in filters to clean, convert, and enrich data during migration.
  • Flexible migration policies: You can choose between full migration or incremental migration based on service requirements to optimize storage usage and migration time.

Impact on Performance

Using Logstash for data migration between clusters relies on the Scroll API. This API can efficiently retrieve index data from the source cluster and synchronize the data to the destination cluster in batches. This process may impact the performance of the source cluster. The specific impact depends on how fast data is retrieved from the source cluster, and the data retrieval speed depends on the size and slice settings of the Scroll API. For details, see the Reindex API document.
  • If the source cluster has a high resource usage, it is advisable to tune the size parameter to slow down the data retrieval speed or perform the migration during off-peak hours, reducing impact on the performance of the source cluster.
  • If the source cluster has a low resource usage, keep the default settings of the Scroll API. In the meantime, monitor the load of the source cluster. Tune the size and slice parameters based on the load conditions of the source cluster, optimizing migration efficiency and resource utilization.

Constraints

During cluster migration, do not add, delete, or modify indexes in the source cluster, or the source and destination clusters will have inconsistent data after the migration.

Prerequisites

  • The source and destination Elasticsearch clusters are available.
  • Ensure that the network between the clusters is connected.
    • If the source cluster, Logstash, and destination cluster are in different VPCs, establish a VPC peering connection between them. For details, see VPC Peering Connection Overview.
    • To migrate an on-premises Elasticsearch cluster to Huawei Cloud, you can configure public network access for the on-premises Elasticsearch cluster.
    • To migrate a third-party Elasticsearch cluster to Huawei Cloud, you need to establish a VPN or Direct Connect connection between the third party's internal data center and Huawei Cloud.
  • Ensure that _source has been enabled for indexes in the cluster.

    By default, _source is enabled. You can run the GET {index}/_search command to check whether it is enabled. If the returned index information contains _source, it is enabled.

Procedure

  1. Obtaining Elasticsearch Cluster Information
  2. (Optional) Migrating the Index Structure: Migrate an Elasticsearch cluster's index template and index structure using scripts.
  3. Creating a Logstash Cluster: Create a Logstash cluster for data migration.
  4. Verifying Connectivity Between Clusters: Verify the connectivity between the Logstash cluster and the source Elasticsearch cluster.
  5. Migrate the source Elasticsearch cluster using Logstash.
  6. Deleting a Logstash Cluster: After the cluster migration is complete, release the Logstash cluster in a timely manner.

Obtaining Elasticsearch Cluster Information

Before migrating a cluster, you need to obtain necessary cluster information for configuring a migration task.

Table 1 Required Elasticsearch cluster information

Cluster Source

Required Information

How to Obtain

Source cluster

Huawei Cloud Elasticsearch cluster

  • Name of the source cluster
  • Access address of the source cluster
  • Username and password for accessing the source cluster (only for security-mode clusters)
  • For details about how to obtain the cluster name and access address, see 3.
  • Contact the service administrator to obtain the username and password.

Self-built Elasticsearch cluster

  • Name of the source cluster.
  • Public network address of the source cluster
  • Username and password for accessing the source cluster (only for security-mode clusters)

Contact the service administrator to obtain the information.

Third-party Elasticsearch cluster

  • Name of the source cluster.
  • Access address of the source cluster
  • Username and password for accessing the source cluster (only for security-mode clusters)

Contact the service administrator to obtain the information.

Destination cluster

Huawei Cloud Elasticsearch cluster

  • Access address of the destination cluster
  • Username and password for accessing the destination cluster (only for security-mode clusters)
  • For details about how to obtain the access address, see 3.
  • Contact the service administrator to obtain the username and password.

The method of obtaining the cluster information varies depending on the source cluster. This section describes how to obtain information about the Huawei Cloud Elasticsearch cluster.

  1. Log in to the CSS management console.
  2. In the navigation pane on the left, choose Clusters > Elasticsearch.
  3. In the Elasticsearch cluster list, obtain the cluster name and access address.
    Figure 2 Obtaining cluster information

(Optional) Migrating the Index Structure

If you plan to manually create an index structure in the destination Elasticsearch cluster, skip this section. This section describes how to migrate an Elasticsearch cluster's index template and index structure using scripts.

  1. Create an ECS to migrate the metadata of the source cluster.
    1. Create an ECS. Select CentOS as the OS of the ECS and 2U4G as its flavor. The ECS must be in the same VPC and security group as the CSS cluster.
    2. Test the connectivity between the ECS and the source and destination clusters.

      Run the curl http:// {ip}:{port} command on the ECS to test the connectivity. If 200 is returned, the connection is successful.

      IP indicates the access address of the source or destination cluster. port indicates the port number. The default port number is 9200. Use the actual port number of the cluster.

      curl http://10.234.73.128:9200 # Enter the actual IP address. A non-security-mode cluster is used here as an example.
      {
        "name" : "es_cluster_migrate-ess-esn-1-1",
        "cluster_name" : "es_cluster_migrate",
        "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
        "version" : {
          "number" : "6.5.4",
          "build_flavor" : "default",
          "build_type" : "tar",
          "build_hash" : "d2ef93d",
          "build_date" : "2018-12-17T21:17:40.758843Z",
          "build_snapshot" : false,
          "lucene_version" : "7.5.0",
          "minimum_wire_compatibility_version" : "5.6.0",
          "minimum_index_compatibility_version" : "5.0.0"
        },
        "Tagline" : "You Know, for Search"
      }
  2. Prepare tools and software. Depending on whether the ECS is connected to the Internet, choose an appropriate method to install them.
    • The VM is connected: use yum and pip to install them. For details, see 3.
    • The VM is disconnected: download the installation package to the VM, and run commands on the VM to install them. For details, see 4.
    Table 2 Tools and software

    Type

    Purpose

    How to Obtain

    Python2

    Used to execute data migration scripts.

    Python2. Select Python 2.7.18.

    winscp

    Cross-platform file transfer tool. Used by Linux to upload scripts.

    WinSCP

  3. The online installation procedure is as follows:
    1. Run yum install python2 to install python2.
      [root@ecs opt]# yum install python2
    2. Run yum install python-pip to install pip.
      [root@ecs opt]# yum install python-pip
    3. Run pip install pyyaml to install the YAML dependency.
    4. Run pip install requests to install the requests dependency.
  4. The offline installation procedure is as follows:
    1. Download the python2 installation package from https://www.python.org/downloads/release/python-2718/. Download and install the source code.
      Figure 3 Downloading the python2 package
    2. Use WinSCP to upload the Python installation package to the opt directory and install Python.
      # Extract the Python package.
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      ...
      # Enter the installation directory.
      [root@ecs-52bc opt]# cd Python-2.7.18
      # Check the file configuration installation path.
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      ...
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # Compile Python.
      [root@ecs-52bc Python-2.7.18]# make
      # Install Python.
      [root@ecs-52bc Python-2.7.18]# make install
    3. Check the Python installation result.
      # Check the Python version.
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # Check the pip version.
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  5. Prepare the index migration script of the source Elasticsearch cluster.
    1. Run the vi migrateConfig.yaml file, modify the following content based on site requirements, and run the wq command to save the file as Logstash migration script: For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
      es_cluster_new:
      # Name of the source cluster
        clustername: es_cluster_new
      # Access address of the source cluster, plus http://.
        src_ip: http://x.x.x.x:9200
      # Username and password for accessing the source cluster. For a non-security-mode cluster, set the value to "".
        src_username: ""
        src_password: ""
      # Access address of the destination Elasticsearch cluster, plus http://.
        dest_ip: http://x.x.x.x:9200
      # Username and password for accessing the destination Elasticsearch cluster. For a non-security-mode cluster, set the value to "".
        dest_username: ""
        dest_password: ""
      # only_mapping is an optional parameter with a default value of false. It must be used in conjunction with migrateMapping.py to specify whether to process only the index of the mapping address in the file. When set to true, only the index data that matches the mapping key in the source cluster is migrated. When set to false, all index data, except for .kibana and .*, is migrated from the source cluster.
      # During migration, the index name is compared with the provided mapping. If a match is found, the mapping value is used as the index name in the destination cluster. If no match is found, the original index name from the source cluster is retained.
        only_mapping: false
      # Set the index to be migrated. key indicates the index name of the source cluster, and value indicates the index name of the destination cluster.
        mapping:
            test_index_1: test_index_1
      # only_compare_index is an optional parameter with a default value of false. It must be used in conjunction with checkIndices.py. When set to false, both the number of indexes and documents are compared. When set to true, only the number of indexes is compared.
        only_compare_index: false
    2. Run the vi migrateTemplate.py command. Copy the following script, and run the wq command to save it as an index template migration script.
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def put_template_to_target(url, template, cluster, template_name, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False)
          if not os.path.exists("templateLogs"):
              os.makedirs("templateLogs")
          if create_resp.status_code != 200:
              print(
                  "create template " + url + " failed with response: " + str(
                      create_resp) + ", source template is " + template_name)
              print(create_resp.text)
              filename = "templateLogs/" + str(cluster) + "#" + template_name
              with open(filename + ".json", "w") as f:
                  json.dump(template, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migration template!")
          config = loadConfig(argv)
          src_clusters = config.keys()
          print("process cluster name:")
          for name in src_clusters:
              print(name)
          print("cluster total number:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster name:" + name)
              source_url = value["src_ip"] + "/_template"
      
              response = requests.get(source_url, auth=source_auth, verify=False)
              if response.status_code != 200:
                  print("*** get all template failed. resp statusCode:" + str(
                      response.status_code) + " response is " + response.text)
                  continue
              all_template = response.json()
              migrate_itemplate = []
      
              for template in all_template.keys():
                  if template.startswith(".") or template == "logstash":
                      continue
                  if "index_patterns" in all_template[template]:
                      for t in all_template[template]["index_patterns"]:
                          # if "kibana" in template:
                          if t.startswith("."):
                              continue
                          migrate_itemplate.append(template)
      
              for template in migrate_itemplate:
                  dest_index_url = value["dest_ip"] + "/_template/" + template
                  result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth)
                  if result is True:
                      print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template)))
                  else:
                      print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    3. Run the vi migrateMapping.py command. Copy the following script and run the wq command to save it as an index structure migration script.
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # config = yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
      
          return True
      
      
      def process_mapping(index_mapping, dest_index):
          # remove unnecessary keys
          del index_mapping["settings"]["index"]["provided_name"]
          del index_mapping["settings"]["index"]["uuid"]
          del index_mapping["settings"]["index"]["creation_date"]
          del index_mapping["settings"]["index"]["version"]
      
          if "lifecycle" in index_mapping["settings"]["index"]:
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # check alias
          aliases = index_mapping["aliases"]
          for alias in list(aliases.keys()):
              if alias == dest_index:
                  print(
                      "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
                  del index_mapping["aliases"][alias]
          # if index_mapping["settings"]["index"].has_key("lifecycle"):
          if "lifecycle" in index_mapping["settings"]["index"]:
              lifecycle = index_mapping["settings"]["index"]["lifecycle"]
              opendistro = {"opendistro": {"index_state_management":
                                               {"policy_id": lifecycle["name"],
                                                "rollover_alias": lifecycle["rollover_alias"]}}}
              index_mapping["settings"].update(opendistro)
              # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
              del index_mapping["settings"]["index"]["lifecycle"]
      
          # replace synonyms_path
          if "analysis" in index_mapping["settings"]["index"]:
              analysis = index_mapping["settings"]["index"]["analysis"]
              if "filter" in analysis:
                  filter = analysis["filter"]
                  if "my_synonym_filter" in filter:
                      my_synonym_filter = filter["my_synonym_filter"]
                      if "synonyms_path" in my_synonym_filter:
                          index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][
                              "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt"
          return index_mapping
      
      
      def getAlias(source, source_auth):
          # get all indices
          response = requests.get(source + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
      
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
      
          return system_index, create_index
      
      
      def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None):
          headers = {'Content-Type': 'application/json'}
          create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False)
          if not os.path.exists("mappingLogs"):
              os.makedirs("mappingLogs")
          if create_resp.status_code != 200:
              print(
                  "create index " + url + " failed with response: " + str(create_resp) +
                  ", source index is " + str(source_index))
              print(create_resp.text)
              filename = "mappingLogs/" + str(cluster) + "#" + str(source_index)
              with open(filename + ".json", "w") as f:
                  json.dump(mapping, f)
              return False
          else:
              return True
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
      
              print("start to process cluster:   " + name)
              # only deal with mapping list
              if 'only_mapping' in value and value["only_mapping"]:
                  for source_index, dest_index in value["mapping"].iteritems():
                      print("start to process source index" + source_index + ", target index: " + dest_index)
                      source_url = source + "/" + source_index
                      response = requests.get(source_url, auth=source_auth)
                      if response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              response.status_code) + " response is " + response.text)
                          continue
                      mapping = response.json()
                      index_mapping = process_mapping(mapping[source_index], dest_index)
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth)
                      if result is False:
                          print("cluster name:" + name + ", " + source_index + ":failure")
                          continue
                      print("cluster name:" + name + ", " + source_index + ":success")
              else:
                  # get all indices
                  system_index, create_index = getAlias(source, source_auth)
                  success_index = 0
                  for index in create_index:
                      source_url = source + "/" + index
                      index_response = requests.get(source_url, auth=source_auth)
                      if index_response.status_code != 200:
                          print("*** get ElasticSearch message failed. resp statusCode:" + str(
                              index_response.status_code) + " response is " + index_response.text)
                          continue
                      mapping = index_response.json()
      
                      dest_index = index
                      if 'mapping' in value:
                          if index in value["mapping"].keys():
                              dest_index = value["mapping"][index]
                      index_mapping = process_mapping(mapping[index], dest_index)
      
                      dest_url = dest + "/" + dest_index
                      result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth)
                      if result is False:
                          print("[failure]: migrate mapping cluster name: " + name + ", " + index)
                          continue
                      print("[success]: migrate mapping cluster name: " + name + ", " + index)
                      success_index = success_index + 1
                  print("create index mapping success total: " + str(success_index))
      
      
      if __name__ == '__main__':
          main(sys.argv)
    4. Run the vi checkIndices.py command. Copy the following script and run the wq command to save it as an index data comparison script.
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)
  6. Run the following commands to migrate the index template and index structure of the Elasticsearch cluster:
    python migrateTemplate.py
    python migrateMapping.py

Creating a Logstash Cluster

After the migration environment in the ECS is prepared, create a Logstash cluster in CSS for data migration.

  1. Log in to the CSS management console.
  2. In the navigation pane on the left, choose Clusters > Logstash.
  3. Click Create Cluster in the upper right corner. The Create Cluster page is displayed.
  4. On the cluster creation page, configure the cluster as prompted.

    Table 3 describes the key parameters. Use the default values for other parameters. For details about how to create a cluster, see Creating a Logstash Cluster.

    Table 3 Key Logstash cluster configurations

    Parameter

    Description

    Billing Mode

    Select Pay-per-use. In this billing mode, you are billed by actual duration of use, with a billing cycle of one hour. For example, 58 minutes of usage will be rounded up to an hour and billed.

    Cluster Type

    Select Logstash.

    Version

    Choose 7.10.0.

    Name

    Cluster name, which contains 4 to 32 characters. Only letters, numbers, hyphens (-), and underscores (_) are allowed and the value must start with a letter.

    Logstash-ES is an example.

    VPC

    A VPC is a secure, isolated logical network environment.

    Select the same VPC as the destination Elasticsearch cluster.

    Subnet

    A subnet provides dedicated network resources that are isolated from other networks, improving network security.

    Select the destination subnet. You can access the VPC management console to view the names and IDs of the existing subnets in the VPC.

    Security Group

    A security group is a collection of access control rules for ECSs that have the same security protection requirements and are mutually trusted in a VPC.

    Select the same security group as the destination Elasticsearch cluster.

  5. Click Next: Advanced Settings and retain the default settings.
  6. Click Next: Confirm Configuration. Confirm the settings, and click Create Now to create the cluster.
  7. Click Back to Cluster List to switch to the Clusters page. The cluster you created is now in the cluster list and its status is Creating. If the cluster is successfully created, its status changes to Available.

Verifying Connectivity Between Clusters

Before starting a migration task, verify the network connectivity between Logstash and the source Elasticsearch cluster.

  1. In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
  2. On the Configuration Center page, click Test Connectivity.
  3. In the dialog box that is displayed, enter the IP address and port number of the source cluster and click Test.
    Figure 4 Testing connectivity

    If Available is displayed, the network is connected. If the network is disconnected, configure routes for the Logstash cluster to connect the clusters. For details, see Configuring Routes for a Logstash Cluster.

Using Logstash to Perform Full Data Migration

At the initial stage of cluster migration, or in scenarios where guaranteeing data integrity takes top priority, it is recommended to use Logstash for full data migration. This approach migrates the entire Elasticsearch cluster's data in one go.

  1. Log in to the CSS management console.
  2. In the navigation pane on the left, choose Clusters > Logstash.
  3. In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
  4. On the Configuration Center page, click Create in the upper right corner. On the Create Configuration File page, edit the configuration file for the full Elasticsearch cluster migration.
    1. Selecting a cluster template: Expand the system template list, select elasticsearch, and click Apply in the Operation column.
    2. Setting the name of the configuration file: Set Name, for example, es-es-all.
    3. Editing the configuration file: Enter the migration configuration plan of the Elasticsearch cluster in Configuration File Content. The following is an example of the configuration file: For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
      input{
           elasticsearch{
      # Access address of the source Elasticsearch cluster. You do not need to add a protocol. If you add the HTTPS protocol, an error will be reported.
              hosts =>  ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
      # Username and password for accessing the source cluster. You do not need to configure them for a non-security-mode cluster.
              # user => "css_logstash"
              # password => "*****"
      # Configure the indexes to be migrated. Use commas (,) to separate multiple indexes. You can use wildcard characters, for example, index*.
              index => "*_202102"
              docinfo => true
              slices => 3
              size => 3000
      # If the destination cluster is accessed through HTTPS, you need to configure the following information:
      # HTTPS access certificate of the cluster. Retain the following for the CSS cluster:
              # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
      # Whether to enable HTTPS communication. Set this parameter to true for a cluster accessed through HTTPS.
              #ssl => true
           }
       }
      
       
      # Remove specified fields added by Logstash.
       filter {
         mutate {
           remove_field => ["@version"]
         }
       }
      
       output{
           elasticsearch{
      # Access address of the destination Elasticsearch cluster
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
      # Username and password for accessing the destination cluster. You do not need to configure them for a non-security-mode cluster.
             # user => "css_logstash"
             # password => "*****"
      # Configure the index of the target cluster. The following configuration indicates that the index name is the same as that of the source end.
             index => "%{[@metadata][_index]}"
             document_type => "%{[@metadata][_type]}"
             document_id => "%{[@metadata][_id]}"
      # If the destination cluster is accessed through HTTPS, you need to configure the following information:
      # HTTPS access certificate of the cluster. Retain the following for the CSS cluster:
             #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
      # Whether to enable HTTPS communication. Set this parameter to true for a cluster accessed through HTTPS.
             #ssl => true
      # Whether to verify the elasticsearch certificate on the server. Set this parameter to false, indicating that the certificate is not verified.
             #ssl_certificate_verification => false
           }
       }
      Table 4 Configuration items for full migration

      Item

      Description

      input

      hosts

      IP address of the source cluster. If the cluster has multiple access nodes, separate them with commas (,).

      user

      Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.

      password

      Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.

      index

      The source indexes to be fully migrated. Use commas (,) to separate multiple indexes. Wildcard is supported, for example, index*.

      docinfo

      Indicates whether to re-index the document. The value must be true.

      slices

      In some cases, it is possible to improve overall throughput by consuming multiple distinct slices of a query simultaneously using sliced scrolls. It is recommended that the value range from 2 to 8.

      size

      Maximum number of hits returned for each query

      output

      hosts

      Access address of the destination cluster. If the cluster has multiple nodes, separate them with commas (,).

      user

      Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.

      password

      Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.

      index

      Name of the index migrated to the destination cluster. It can be modified and expanded, for example, Logstash-%{+yyyy.MM.dd}.

      document_type

      Ensure that the document type on the destination end is the same as that on the source end.

      document_id

      Document ID in the index. It is advisable to keep consistent document IDs on the source and destination clusters. If you want to have document IDs automatically generated, use the number sign (#) to comment it out.

    4. Click Next to configure Logstash pipeline parameters.

      In this example, retain the default values. For details about how to set the parameters, see .

    5. Click OK.

      On the Configuration Center page, you can check the created configuration file. If its status changes to Available, it has been successfully created.

  5. Execute the full migration task.
    1. In the configuration file list, select configuration file es-es-all and click Start in the upper left corner.
    2. In the Start Logstash dialog box, select Keepalive if necessary. In this example, Keepalive is not enabled.

      When Keepalive is enabled, a daemon process will be configured on each node. If the Logstash service becomes faulty, the daemon process will try to rectify the fault and restart the service. You are advised to enable Keepalive for services running long-term. Do not enable it for services running only short-term, or your migration tasks may fail due to a lack of source data.

    3. Click OK to start the configuration file and hence the Logstash full migration task.

      You can view the started configuration file in the pipeline list.

  6. After data migration is complete, check data consistency.
    • Method 1: Use PuTTY to log in to the VM used for the migration and run the python checkIndices.py command to compare the data.
    • Method 2: Run the GET _cat/indices command on the Kibana console of the source and destination clusters, separately, to check whether their indexes are consistent.

Using Logstash to Incrementally Migrate Cluster Data

In scenarios where continuous data synchronization or real-time data is required, it is recommended to use Logstash incremental cluster data migration. This method involves configuring incremental queries in Logstash, allowing only index data with incremental fields to be migrated.

  1. Log in to the CSS management console.
  2. In the navigation pane on the left, choose Clusters > Logstash.
  3. In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
  4. On the Configuration Center page, click Create in the upper right corner. On the Create Configuration File page, edit the configuration file for the incremental migration.
    1. Selecting a cluster template: Expand the system template list, select elasticsearch, and click Apply in the Operation column.
    2. Setting the name of the configuration file: Set Name, for example, es-es-inc.
    3. Editing the configuration file: Enter the migration configuration plan of the Elasticsearch cluster in Configuration File Content. The following is an example of the configuration file:

      The incremental migration configuration varies according to the index and must be provided based on the index analysis. For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.

      input{
           elasticsearch{
      # Access address of the source Elasticsearch cluster. You do not need to add a protocol. If you add the HTTPS protocol, an error will be reported.
               hosts =>  ["xx.xx.xx.xx:9200"]
      # Username and password for accessing the source cluster. You do not need to configure them for a non-security-mode cluster.
               user => "css_logstash"
               password => "******"
      # Configure incremental migration indexes.
               index => "*_202102"
      # Configure incremental migration query statements.
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
      # If the destination cluster is accessed through HTTPS, you need to configure the following information:
               # HTTPS access certificate of the cluster. Retain the following for the CSS cluster:
               # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0
      # Whether to enable HTTPS communication. Set this parameter to true for HTTPS-based cluster access.
               #ssl => true
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
      # Access address of the destination cluster.
               hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
      # Username and password for accessing the destination cluster. You do not need to configure them for a non-security-mode cluster.
               #user => "admin"
               #password => "******"
      # Configure the index of the target cluster. The following configuration indicates that the index name is the same as that of the source end.
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
      # If the destination cluster is accessed through HTTPS, you need to configure the following information:
      # HTTPS access certificate of the cluster. Retain the default value for the CSS cluster.
               #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"  # for 7.10.0 
      # Whether to enable HTTPS communication. Set this parameter to true for HTTPS-based cluster access.
               #ssl => true
      # Whether to verify the elasticsearch certificate on the server. Set this parameter to false, indicating that the certificate is not verified.
               #ssl_certificate_verification => false
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }
      Table 5 Incremental migration configuration items

      Configuration

      Description

      hosts

      Access addresses of the source and target clusters. If a cluster has multiple nodes, enter all their access addresses.

      user

      Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.

      password

      Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.

      index

      Indexes to be incrementally migrated. One configuration file supports the incremental migration of only one index.

      query

      Identifier of incremental data. Typically, it is a DLS statement of Elasticsearch and needs to be analyzed in advance. postsDate indicates the time field in the service.

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      This command means to migrate data added after 2021-05-25. During multiple incremental migrations, you need to change the log value. If the indexes in the source end Elasticsearch use the timestamp format, convert the data to a timestamp here. The validity of this command must be verified in advance.

      scroll

      If there is massive data on the source end, you can use the scroll function to obtain data in batches to prevent Logstash memory overflow. The default value is 1m. The interval cannot be too long. Otherwise, data may be lost.

  5. Execute the incremental migration task.
    1. In the configuration file list, select configuration file es-es-inc and click Start in the upper left corner.
    2. In the Start Logstash dialog box, select Keepalive if necessary. In this example, Keepalive is not enabled.

      When Keepalive is enabled, a daemon process will be configured on each node. If the Logstash service becomes faulty, the daemon process will try to rectify the fault and restart the service. You are advised to enable Keepalive for services running long-term. Do not enable it for services running only short-term, or your migration tasks may fail due to a lack of source data.

    3. Click OK to start the configuration file and hence the Logstash incremental migration task.

      You can view the started configuration file in the pipeline list.

  6. After data migration is complete, check data consistency.
    • Method 1: Use PuTTY to log in to the VM used for the migration and run the python checkIndices.py command to compare the data.
    • Method 2: Run the GET _cat/indices command on the Kibana console of the source and destination clusters, separately, to check whether their indexes are consistent.

Deleting a Logstash Cluster

After the migration is complete, release the Logstash cluster in a timely manner to save resources and avoid unnecessary fees.

  1. Log in to the CSS management console.
  2. In the navigation pane on the left, choose Clusters > Logstash.
  3. In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click More > Delete in the Operation column. In the confirmation dialog box, manually type in DELETE, and click OK.