Help Center> Cloud Search Service> Best Practices> Cluster Migration> Migration from Elasticsearch> Using Logstash to Perform Incremental Data Migration
Updated on 2024-04-19 GMT+08:00

Using Logstash to Perform Incremental Data Migration

Logstash supports full data migration and incremental data migration. You can select full migration for the first time, and incremental migration for subsequent data migration. This section describes how to use Logstash to incrementally migrate cluster data. Incremental migration requires indexes with timestamps to identify incremental data.

Prepare for the migration by referring to Restrictions and Preparations. The procedure is as follows:

Restrictions

  • Logstash version restrictions:

    CSS supports clusters of versions 5.5.1, 6.3.2, 6.5.4, 7.1.1, 7.6.2, and 7.10.2. Ensure that the major versions of the clusters whose data you want to migrate are the same.

    If the Elasticsearch cluster version is 5.x, select Logstash 5.6.16. If the Elasticsearch cluster version is 7.x, select Logstash 7.10.0.

  • Do not modify indexes during cluster migration. Otherwise, the original data will be inconsistent with the migrated data.
  • If there is less than 100 GB indexes, separate index analysis is not required.

Preparations

  • Create a VM for data migration.
    1. Create a VM to migrate the metadata of the source cluster.
      1. Create a Linux ECS with 2 vCPUs and 4 GB memory.
      2. Run the curl http:// {IP_address}:{port} command to test the connectivity between the VM and the source cluster and between the VM and the destination cluster.

        IP_address indicates the access address of the source and destination clusters. Enter the actual port number of the cluster. The default port is 9200.

        The following example applies only to non-security clusters.

        curl http://10.234.73.128:9200
        {
          "name" : "voc_es_cluster_new-ess-esn-1-1",
          "cluster_name" : "voc_es_cluster_new",
          "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
          "version" : {
            "number" : "6.5.4",
            "build_flavor" : "default",
            "build_type" : "tar",
            "build_hash" : "d2ef93d",
            "build_date" : "2018-12-17T21:17:40.758843Z",
            "build_snapshot" : false,
            "lucene_version" : "7.5.0",
            "minimum_wire_compatibility_version" : "5.6.0",
            "minimum_index_compatibility_version" : "5.0.0"
          },
          "Tagline" : "You Know, for Search"
        }
  • Prepare the tools and software.

    The installation method is determined by whether the VM can be connected to the Internet. If VM can be connected to the Internet, use yum and pip to install the software. If VM cannot be connected to the Internet, download the installation package to the VM and run the installation commands.

    The online installation procedure is as follows:

    1. Run yum install python2 to install python2.
      [root@ecs opt]# yum install python2
    2. Run yum install python-pip to install pip.
      [root@ecs opt]# yum install python-pip
    3. Run pip install pyyaml to install the YAML dependency.
    4. Run pip install requests to install the requests dependency.

    The online installation procedure is as follows:

    1. Download the python2 installation package from https://www.python.org/downloads/release/python-2718/. Download and install the source code.
      Figure 1 Downloading the python2 package
    2. Use WinSCP to upload the Python installation package to the opt directory and install Python.
      # Decompress the Python package.
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      ...
      # After the decompression, go to the directory.
      [root@ecs-52bc opt]# cd Python-2.7.18
      # Check the file configuration installation path.
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      ...
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # Compile Python.
      [root@ecs-52bc Python-2.7.18]# make
      # Install Python.
      [root@ecs-52bc Python-2.7.18]# make install
    3. Check the Python installation result.
      # Check the Python version.
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # Check the pip version.
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  • Prepare the execution script.
    1. Run the vi migrateConfig.yaml command to add configuration files.
      es_cluster_new:
      # Cluster name
        clustername: es_cluster_new
        # Address of the source Elasticsearch cluster, with http:// at the beginning
        src_ip: http://x.x.x.x:9200
        # If there is no username, set the password to "".
        src_username: ""
        src_password: ""
        # Address of the target Elasticsearch cluster, with http:// at the beginning
        dest_ip: http://x.x.x.x:9200
        # If there is no username, set the password to "".
        dest_username: ""
        dest_password: ""
        #This parameter is optional. The default value is false. It is used by migrateMapping.py.
        # Indicates whether to process only the mapping address indexes in the file.
        # If this parameter is set to true, only the indexes in the following mappings are obtained and created in the target.
        # If this parameter is set to false, all the indexes of the source cluster are obtained, excluding the following: .kibana, .*
        # Match the index names with the following mappings. If an index name matches a mapping, use the value of the mapping as the index name in the target.
        # If no match is found, the original index name in the source is used.
        only_mapping: false
        # Index to be migrated. key indicates the index name in the source, and value indicates the index name in the target.
        mapping:
            test_index_1: test_index_1
      
        # This parameter is optional. The default value is false. It is used by checkIndices.py.
        # If this parameter is set to false, all indexes and the number of documents are compared. If this parameter is set to true, only the number of indexes is compared.
        only_compare_index: false

      Configuration file parameters

      Configuration

      Description

      clustername

      Cluster name

      src_ip

      IP address for accessing the source cluster. Only one IP address is required. The default port number is 9200. If the port number for accessing the cluster is not 9200, use the actual port number.

      src_username

      Username for accessing the source cluster. If this parameter is not required, set it to "".

      src_password

      Password for accessing the source cluster. If this parameter is not required, set it to "".

      dest_ip

      IP address for accessing the target cluster. Only one IP address is required. The default port number is 9200. If the port number for accessing the cluster is not 9200, use the actual port number.

      dest_username

      Username for accessing the target cluster. If this parameter is not required, set it to "".

      dest_password

      Password for accessing the target cluster. If this parameter is not required, set it to "".

    2. Run the vi checkIndices.py command. Copy the following script to generate the index data comparison script:
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if not os.path.exists("mappingLogs"):
                  os.makedirs("mappingLogs")
              filename = "mappingLogs/" + str(cluster_name) + "#indices_stats"
              with open(filename + ".json", "w") as f:
                  json.dump("cluster name: " + cluster_name, f)
                  f.write("\n")
                  json.dump("source indices: ", f)
                  f.write("\n")
                  json.dump(all_source_index, f, indent=4)
                  f.write("\n")
                  json.dump("destination indices : ", f)
                  f.write("\n")
                  json.dump(all_dest_index, f, indent=4)
                  f.write("\n")
      
              print("source indices total     : " + str(all_source_index.__len__()))
              print("destination index total  : " + str(all_dest_index.__len__()))
      
              filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping"
              filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping"
              with open(filename_src + ".json", "a") as f_src:
                  json.dump("cluster name: " + cluster_name, f_src)
                  f_src.write("\n")
              with open(filename_dest + ".json", "a") as f_dest:
                  json.dump("cluster name: " + cluster_name, f_dest)
                  f_dest.write("\n")
              for index in all_source_index:
                  mapping = get_mapping(source, source_auth, index)
                  with open(filename + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
                  with open(filename_src + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
                  mapping = get_mapping(dest, dest_auth, index)
                  with open(filename + ".json", "a") as f_dest:
                      json.dump("========================", f_dest)
                      f_dest.write("\n")
                      json.dump(mapping, f_dest, indent=4)
                      f_dest.write("\n")
                  with open(filename_dest + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
              print("source indices write file success,      file: " + filename_src)
              print("destination indices write file success, file: " + filename_dest)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)

Step 1: Creating a Logstash Cluster

  • Logstash clusters are used to migrate data. By default, Logstash clusters are charged in pay-per-use mode. After data migration is complete, you are advised to delete the Logstash cluster to save costs.
  • If there are multiple cluster indexes, you can create multiple Logstash clusters and configure different migration tasks for them.
  1. Log in to the CSS management console.
  2. On the Dashboard or Clusters page, choose Logstash in the navigation pane on the left.
  3. Click Create Cluster. The Create Cluster page is displayed.
  4. Specify Region and AZ.
  5. Specify the basic cluster information, select the cluster type and cluster version, and enter the cluster name.
    Table 1 Basic parameters

    Parameter

    Description

    Cluster Type

    Select Logstash.

    Version

    5.6.16 and 7.10.0 are supported.

    If the Elasticsearch cluster version is 5.x, select Logstash 5.6.16. If the Elasticsearch cluster version is 7.x, select Logstash 7.10.0.

    Name

    Cluster name, which contains 4 to 32 characters. Only letters, numbers, hyphens (-), and underscores (_) are allowed and the value must start with a letter.

    Figure 2 Configuring basic information
  6. Set host specifications of the cluster. Set the number of Nodes to 1. Set Node Specifications to 8 vCPUs | 16 GB and retain the default values for other parameters.
    Figure 3 Configuring host specifications
  7. Set the enterprise project. Retain the default value.
  8. Click Next: Configure Network. Configure the cluster network.
    Table 2 Parameter description

    Parameter

    Description

    VPC

    A VPC is a secure, isolated, and logical network environment.

    Select the target VPC. Click View VPC to enter the VPC management console and view the created VPC names and IDs. If no VPCs are available, create one.

    NOTE:

    The VPC must contain CIDRs. Otherwise, cluster creation will fail. By default, a VPC will contain CIDRs.

    Subnet

    A subnet provides dedicated network resources that are isolated from other networks, improving network security.

    Select the target subnet. You can access the VPC management console to view the names and IDs of the existing subnets in the VPC.

    Security Group

    A security group implements access control for ECSs that have the same security protection requirements in a VPC. To view more details about the security group, click View Security Group.

    NOTE:

    Ensure that Port Range/ICMP Type is Any or a port range includes port 9200 for the selected security group.

    Figure 4 Configuring network specifications
  9. Click Next: Configure Advanced Settings. You can select Default or Custom for Advanced Settings. Retain the default settings in this example.
  10. Click Next: Confirm. Check the configuration and click Next to create a cluster.
  11. Click Back to Cluster List to switch to the Clusters page. The cluster you created is listed on the displayed page and its status is Creating. If the cluster is successfully created, its status will change to Available.

Step 2: Verifying Cluster Connectivity

Verify the connectivity between Logstash and the source and destination clusters.

  1. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center in the navigation pane on the left to go to the configuration center page. Alternatively, click Configuration Center in the Operation column of the target cluster to go to the configuration center page.
  2. On the Configuration Center page, click Test Connectivity.
  3. Enter the IP addresses or domain names and port numbers of the source and destination clusters, and click Test.
    Figure 5 Testing the connectivity

Step 3: Configuring a Logstash Incremental Data Migration Task

  1. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center, or click Configuration Center in the Operation column of the target cluster. The Configuration Center page is displayed.
  2. Click Create in the upper right corner. On the configuration file creation page that is displayed, select a cluster template and modify the migration configuration file of the Elasticsearch cluster.
    In this example, HTTPS is not enabled for the two Elasticsearch clusters.
    • Select a cluster template: In this example, data is imported from an Elasticsearch cluster to an Elasticsearch cluster. Locate the elasticsearch row and click Apply in the Operation column. Add cluster configurations as required.
    • Modify the configuration file. Specify the configuration name, for example, es-es-inc. Specify the migration configuration file of the Elasticsearch cluster. The following is an example of the configuration file:
      input{
           elasticsearch{
               hosts =>  ["xx.xx.xx.xx:9200"]
               user => "css_logstash"
               password => "******"
               index => "*_202102"
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
               #scroll => "5m"
      
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
               user => "admin"
               password => "******"
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }

      Modify the following configurations:

      Table 3 Modification of cluster configurations

      Configuration

      Description

      hosts

      Access addresses of the source and target clusters. If a cluster has multiple nodes, enter all their access addresses.

      user

      Username for accessing the cluster. If there are no usernames, use the number sign (#) to comment out this item.

      password

      Password for accessing the cluster. If there are no usernames or passwords, use the number sign (#) to comment out this item.

      index

      Indexes to be incrementally migrated. One configuration file supports the incremental migration of only one index.

      query

      Identifier of incremental data. Generally, it is the DLS statement of Elasticsearch and needs to be analyzed in advance. postsDate indicates the time field in the service.

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      This command means to migrate data added after 2021-05-25. During multiple incremental migrations, you need to change the log value. If the indexes in the source end Elasticsearch use the timestamp format, convert the data to a timestamp here. The validity of this command must be verified in advance.

      scroll

      If there is massive data on the source end, you can use the scroll function to obtain data in batches to prevent Logstash memory overflow. The default value is "1m". The interval cannot be too long. Otherwise, data may be lost.

      The incremental migration configuration varies according to the index and must be provided based on the index analysis.

Step 4: Performing an Incremental Data Migration

  1. Use PuTTY to log in to the Linux VM created in Preparations.
  2. On the Logstash clusters page, click the name of the Logstash cluster created in Step 1: Creating a Logstash Cluster. The Cluster Information page is displayed. Choose Configuration Center, or click Configuration Center in the Operation column of the target cluster. The Configuration Center page is displayed.
  3. Select the configuration file created in section Step 3: Configuring a Logstash Incremental Data Migration Task and click Start in the upper left corner.
  4. Start data migration immediately once the Logstash service is started as prompted .
  5. You can view the startup configuration file under the pipe.
  6. After the data migration is complete, use PuTTY to log in to the Linux VM and run the python checkIndices.py command to compare the data.

Step 5: Deleting the Logstash Cluster

After the cluster migration is complete, delete the Logstash cluster.

  1. Log in to the CSS management console.
  2. Choose Clusters > Logstash. On the displayed page, locate the row that contains the target cluster and click More > Delete in the Operation column.
  3. In the displayed dialog box, enter the name of the cluster to be deleted again and click OK.