Migrating Data Between Elasticsearch Clusters Using Huawei Cloud Logstash
You can use a Logstash cluster created using Huawei Cloud's Cloud Search Service (CSS) to migrate data between Elasticsearch clusters.
Scenarios
Huawei Cloud Logstash is a fully managed data ingestion and processing service. It is compatible with open-source Logstash and can be used for data migration between Elasticsearch clusters.
You can use Huawei Cloud Logstash to migrate data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch to Huawei Cloud Elasticsearch. This solution applies to the following scenarios:
- Cross-version migration: Migrate data between different versions to maintain data availability and consistency in the new version, leveraging the compatibility and flexibility of Logstash. This mode applies to migration scenarios where the Elasticsearch cluster version span is large, for example, from 6.X to 7.X.
- Cluster merging: Utilize Logstash to transfer and consolidate data from multiple Elasticsearch clusters into a single Elasticsearch cluster, enabling unified management and analysis of data from different Elasticsearch clusters.
- Cloud migration: Migrate an on-premises Elasticsearch service to the cloud to enjoy the benefits of cloud services, such as scalability, ease-of-maintenance, and cost-effectiveness.
- Changing the service provider: An enterprise currently using a third-party Elasticsearch service wishes to switch to Huawei Cloud for reasons such as cost, performance, or other strategic considerations.
Solution Architecture
Figure 1 shows how to migrate data between Elasticsearch clusters using Huawei Cloud Logstash.
- Input: Huawei Cloud Logstash receives data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch.
The procedure for migrating data from Huawei Cloud Elasticsearch, self-built Elasticsearch, or third-party Elasticsearch to Huawei Cloud Elasticsearch remains the same. The only difference is the access address of the source cluster. For details, see Obtaining Elasticsearch Cluster Information.
- Filter: Huawei Cloud Logstash cleanses and converts data.
- Output: Huawei Cloud Logstash outputs data to the destination system, for example, Huawei Cloud Elasticsearch.
- Full data migration: Utilize Logstash to perform a complete data migration. This method is suitable for the initial phase of migration or scenarios where data integrity must be ensured.
- Incremental data migration: Configure Logstash to perform incremental queries and migrate only index data with incremental fields. This method is suitable for scenarios requiring continuous data synchronization or real-time data updates.
Advantages
- Compatibility with later versions: This solution supports the migration of Elasticsearch clusters across different versions.
- Efficient data processing: Logstash supports batch read and write operations, significantly enhancing data migration efficiency.
- Concurrent synchronization technology: The slice concurrent synchronization technology can be utilized to boost data migration speed and performance, especially when handling large volumes of data.
- Simple configuration: Huawei Cloud Logstash offers a straightforward and intuitive configuration process, allowing you to input, process, and output data through configuration files.
- Powerful data processing: Logstash includes various built-in filters to clean, convert, and enrich data during migration.
- Flexible migration policies: You can choose between full migration or incremental migration based on service requirements to optimize storage usage and migration time.
Impact on Performance
- If the source cluster has a high resource usage, it is advisable to tune the size parameter to slow down the data retrieval speed or perform the migration during off-peak hours, reducing impact on the performance of the source cluster.
- If the source cluster has a low resource usage, keep the default settings of the Scroll API. In the meantime, monitor the load of the source cluster. Tune the size and slice parameters based on the load conditions of the source cluster, optimizing migration efficiency and resource utilization.
Constraints
During cluster migration, do not add, delete, or modify indexes in the source cluster, or the source and destination clusters will have inconsistent data after the migration.
Prerequisites
- The source and destination Elasticsearch clusters are available.
- Ensure that the network between the clusters is connected.
- If the source cluster, Logstash, and destination cluster are in different VPCs, establish a VPC peering connection between them. For details, see VPC Peering Connection Overview.
- To migrate an on-premises Elasticsearch cluster to Huawei Cloud, you can configure public network access for the on-premises Elasticsearch cluster.
- To migrate a third-party Elasticsearch cluster to Huawei Cloud, you need to establish a VPN or Direct Connect connection between the third party's internal data center and Huawei Cloud.
- Ensure that _source has been enabled for indexes in the cluster.
By default, _source is enabled. You can run the GET {index}/_search command to check whether it is enabled. If the returned index information contains _source, it is enabled.
Procedure
- Obtaining Elasticsearch Cluster Information
- (Optional) Migrating the Index Structure: Migrate an Elasticsearch cluster's index template and index structure using scripts.
- Creating a Logstash Cluster: Create a Logstash cluster for data migration.
- Verifying Connectivity Between Clusters: Verify the connectivity between the Logstash cluster and the source Elasticsearch cluster.
- Migrate the source Elasticsearch cluster using Logstash.
- Using Logstash to Perform Full Data Migration is recommended at the initial stage of cluster migration or in scenarios where data integrity needs to be ensured.
- Using Logstash to Incrementally Migrate Cluster Data is recommended for scenarios that require continuous data synchronization or real-time data updates.
- Deleting a Logstash Cluster: After the cluster migration is complete, release the Logstash cluster in a timely manner.
Obtaining Elasticsearch Cluster Information
Before migrating a cluster, you need to obtain necessary cluster information for configuring a migration task.
Cluster Source |
Required Information |
How to Obtain |
|
---|---|---|---|
Source cluster |
Huawei Cloud Elasticsearch cluster |
|
|
Self-built Elasticsearch cluster |
|
Contact the service administrator to obtain the information. |
|
Third-party Elasticsearch cluster |
|
Contact the service administrator to obtain the information. |
|
Destination cluster |
Huawei Cloud Elasticsearch cluster |
|
|
The method of obtaining the cluster information varies depending on the source cluster. This section describes how to obtain information about the Huawei Cloud Elasticsearch cluster.
(Optional) Migrating the Index Structure
If you plan to manually create an index structure in the destination Elasticsearch cluster, skip this section. This section describes how to migrate an Elasticsearch cluster's index template and index structure using scripts.
- Create an ECS to migrate the metadata of the source cluster.
- Create an ECS. Select CentOS as the OS of the ECS and 2U4G as its flavor. The ECS must be in the same VPC and security group as the CSS cluster.
- Test the connectivity between the ECS and the source and destination clusters.
Run the curl http:// {ip}:{port} command on the ECS to test the connectivity. If 200 is returned, the connection is successful.
IP indicates the access address of the source or destination cluster. port indicates the port number. The default port number is 9200. Use the actual port number of the cluster.
curl http://10.234.73.128:9200 # Enter the actual IP address. A non-security-mode cluster is used here as an example. { "name" : "es_cluster_migrate-ess-esn-1-1", "cluster_name" : "es_cluster_migrate", "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "Tagline" : "You Know, for Search" }
- Prepare tools and software. Depending on whether the ECS is connected to the Internet, choose an appropriate method to install them.
- The online installation procedure is as follows:
- Run yum install python2 to install python2.
[root@ecs opt]# yum install python2
- Run yum install python-pip to install pip.
[root@ecs opt]# yum install python-pip
- Run pip install pyyaml to install the YAML dependency.
- Run pip install requests to install the requests dependency.
- Run yum install python2 to install python2.
- The offline installation procedure is as follows:
- Download the python2 installation package from https://www.python.org/downloads/release/python-2718/. Download and install the source code.
Figure 3 Downloading the python2 package
- Use WinSCP to upload the Python installation package to the opt directory and install Python.
# Extract the Python package. [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz Python-2.7.18/Modules/zlib/crc32.c Python-2.7.18/Modules/zlib/gzlib.c Python-2.7.18/Modules/zlib/inffast.c Python-2.7.18/Modules/zlib/example.c Python-2.7.18/Modules/python.c Python-2.7.18/Modules/nismodule.c Python-2.7.18/Modules/Setup.config.in ... # Enter the installation directory. [root@ecs-52bc opt]# cd Python-2.7.18 # Check the file configuration installation path. [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2 ... checking for build directories... checking for --with-computed-gotos... no value specified checking whether gcc -pthread supports computed gotos... yes done checking for ensurepip... no configure: creating ./config.status config.status: creating Makefile.pre config.status: creating Modules/Setup.config config.status: creating Misc/python.pc config.status: creating Modules/ld_so_aix config.status: creating pyconfig.h creating Modules/Setup creating Modules/Setup.local creating Makefile # Compile Python. [root@ecs-52bc Python-2.7.18]# make # Install Python. [root@ecs-52bc Python-2.7.18]# make install
- Check the Python installation result.
# Check the Python version. [root@ecs-52bc Python-2.7.18]# python --version Python 2.7.5 # Check the pip version. [root@ecs-52bc Python-2.7.18]# pip --version pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7) [root@ecs-52bc Python-2.7.18]#
- Download the python2 installation package from https://www.python.org/downloads/release/python-2718/. Download and install the source code.
- Prepare the index migration script of the source Elasticsearch cluster.
- Run the vi migrateConfig.yaml file, modify the following content based on site requirements, and run the wq command to save the file as Logstash migration script: For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
es_cluster_new: # Name of the source cluster clustername: es_cluster_new # Access address of the source cluster, plus http://. src_ip: http://x.x.x.x:9200 # Username and password for accessing the source cluster. For a non-security-mode cluster, set the value to "". src_username: "" src_password: "" # Access address of the destination Elasticsearch cluster, plus http://. dest_ip: http://x.x.x.x:9200 # Username and password for accessing the destination Elasticsearch cluster. For a non-security-mode cluster, set the value to "". dest_username: "" dest_password: "" # only_mapping is an optional parameter with a default value of false. It must be used in conjunction with migrateMapping.py to specify whether to process only the index of the mapping address in the file. When set to true, only the index data that matches the mapping key in the source cluster is migrated. When set to false, all index data, except for .kibana and .*, is migrated from the source cluster. # During migration, the index name is compared with the provided mapping. If a match is found, the mapping value is used as the index name in the destination cluster. If no match is found, the original index name from the source cluster is retained. only_mapping: false # Set the index to be migrated. key indicates the index name of the source cluster, and value indicates the index name of the destination cluster. mapping: test_index_1: test_index_1 # only_compare_index is an optional parameter with a default value of false. It must be used in conjunction with checkIndices.py. When set to false, both the number of indexes and documents are compared. When set to true, only the number of indexes is compared. only_compare_index: false
- Run the vi migrateTemplate.py command. Copy the following script, and run the wq command to save it as an index template migration script.
# -*- coding:UTF-8 -*- import sys import yaml import requests import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def put_template_to_target(url, template, cluster, template_name, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False) if not os.path.exists("templateLogs"): os.makedirs("templateLogs") if create_resp.status_code != 200: print( "create template " + url + " failed with response: " + str( create_resp) + ", source template is " + template_name) print(create_resp.text) filename = "templateLogs/" + str(cluster) + "#" + template_name with open(filename + ".json", "w") as f: json.dump(template, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migration template!") config = loadConfig(argv) src_clusters = config.keys() print("process cluster name:") for name in src_clusters: print(name) print("cluster total number:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster name:" + name) source_url = value["src_ip"] + "/_template" response = requests.get(source_url, auth=source_auth, verify=False) if response.status_code != 200: print("*** get all template failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue all_template = response.json() migrate_itemplate = [] for template in all_template.keys(): if template.startswith(".") or template == "logstash": continue if "index_patterns" in all_template[template]: for t in all_template[template]["index_patterns"]: # if "kibana" in template: if t.startswith("."): continue migrate_itemplate.append(template) for template in migrate_itemplate: dest_index_url = value["dest_ip"] + "/_template/" + template result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth) if result is True: print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template))) else: print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template))) if __name__ == '__main__': main(sys.argv)
- Run the vi migrateMapping.py command. Copy the following script and run the wq command to save it as an index structure migration script.
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True def process_mapping(index_mapping, dest_index): # remove unnecessary keys del index_mapping["settings"]["index"]["provided_name"] del index_mapping["settings"]["index"]["uuid"] del index_mapping["settings"]["index"]["creation_date"] del index_mapping["settings"]["index"]["version"] if "lifecycle" in index_mapping["settings"]["index"]: del index_mapping["settings"]["index"]["lifecycle"] # check alias aliases = index_mapping["aliases"] for alias in list(aliases.keys()): if alias == dest_index: print( "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.") del index_mapping["aliases"][alias] # if index_mapping["settings"]["index"].has_key("lifecycle"): if "lifecycle" in index_mapping["settings"]["index"]: lifecycle = index_mapping["settings"]["index"]["lifecycle"] opendistro = {"opendistro": {"index_state_management": {"policy_id": lifecycle["name"], "rollover_alias": lifecycle["rollover_alias"]}}} index_mapping["settings"].update(opendistro) # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"] del index_mapping["settings"]["index"]["lifecycle"] # replace synonyms_path if "analysis" in index_mapping["settings"]["index"]: analysis = index_mapping["settings"]["index"]["analysis"] if "filter" in analysis: filter = analysis["filter"] if "my_synonym_filter" in filter: my_synonym_filter = filter["my_synonym_filter"] if "synonyms_path" in my_synonym_filter: index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][ "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt" return index_mapping def getAlias(source, source_auth): # get all indices response = requests.get(source + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return system_index, create_index def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") if create_resp.status_code != 200: print( "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + str(source_index)) print(create_resp.text) filename = "mappingLogs/" + str(cluster) + "#" + str(source_index) with open(filename + ".json", "w") as f: json.dump(mapping, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster: " + name) # only deal with mapping list if 'only_mapping' in value and value["only_mapping"]: for source_index, dest_index in value["mapping"].iteritems(): print("start to process source index" + source_index + ", target index: " + dest_index) source_url = source + "/" + source_index response = requests.get(source_url, auth=source_auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue mapping = response.json() index_mapping = process_mapping(mapping[source_index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth) if result is False: print("cluster name:" + name + ", " + source_index + ":failure") continue print("cluster name:" + name + ", " + source_index + ":success") else: # get all indices system_index, create_index = getAlias(source, source_auth) success_index = 0 for index in create_index: source_url = source + "/" + index index_response = requests.get(source_url, auth=source_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) continue mapping = index_response.json() dest_index = index if 'mapping' in value: if index in value["mapping"].keys(): dest_index = value["mapping"][index] index_mapping = process_mapping(mapping[index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth) if result is False: print("[failure]: migrate mapping cluster name: " + name + ", " + index) continue print("[success]: migrate mapping cluster name: " + name + ", " + index) success_index = success_index + 1 print("create index mapping success total: " + str(success_index)) if __name__ == '__main__': main(sys.argv)
- Run the vi checkIndices.py command. Copy the following script and run the wq command to save it as an index data comparison script.
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True # get all indices def get_indices(url, source_auth): response = requests.get(url + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return create_index def get_mapping(url, _auth, index): source_url = url + "/" + index index_response = requests.get(source_url, auth=_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return "[failure] --- index is not exist in destination es. ---" mapping = index_response.json() return mapping def get_index_total(url, index, es_auth): stats_url = url + "/" + index + "/_stats" index_response = requests.get(stats_url, auth=es_auth, verify=False) if index_response.status_code != 200: print("*** get ElasticSearch stats message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return 0 return index_response.json() def get_indices_stats(url, es_auth): endpoint = url + "/_cat/indices" indicesResult = requests.get(endpoint, es_auth) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: indexList.append(indices.split()[2]) return indexList def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # python3 # return yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) cluster_name = name if "clustername" in value: cluster_name = value["clustername"] print("start to process cluster :" + cluster_name) # get all indices all_source_index = get_indices(source, source_auth) all_dest_index = get_indices(dest, dest_auth) if "only_compare_index" in value and value["only_compare_index"]: print("[success] only compare mapping, not compare index count.") continue for index in all_source_index: index_total = get_index_total(value["src_ip"], index, source_auth) src_total = index_total["_all"]["primaries"]["docs"]["count"] src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024 dest_index = get_index_total(value["dest_ip"], index, dest_auth) if dest_index is 0: print('[failure] not found, index: %-20s, source total: %-10s size %6sM' % (str(index), str(src_total), src_size)) continue dest_total = dest_index["_all"]["primaries"]["docs"]["count"] if src_total != dest_total: print('[failure] not consistent, ' 'index: %-20s, source total: %-10s size %6sM destination total: %-10s ' % (str(index), str(src_total), src_size, str(dest_total))) continue print('[success] compare index total equal : index : %-20s, total: %-20s ' % (str(index), str(dest_total))) if __name__ == '__main__': main(sys.argv)
- Run the vi migrateConfig.yaml file, modify the following content based on site requirements, and run the wq command to save the file as Logstash migration script: For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
- Run the following commands to migrate the index template and index structure of the Elasticsearch cluster:
python migrateTemplate.py python migrateMapping.py
Creating a Logstash Cluster
After the migration environment in the ECS is prepared, create a Logstash cluster in CSS for data migration.
- Log in to the CSS management console.
- In the navigation pane on the left, choose .
- Click Create Cluster in the upper right corner. The Create Cluster page is displayed.
- On the cluster creation page, configure the cluster as prompted.
Table 3 describes the key parameters. Use the default values for other parameters. For details about how to create a cluster, see Creating a Logstash Cluster.
Table 3 Key Logstash cluster configurations Parameter
Description
Billing Mode
Select Pay-per-use. In this billing mode, you are billed by actual duration of use, with a billing cycle of one hour. For example, 58 minutes of usage will be rounded up to an hour and billed.
Cluster Type
Select Logstash.
Version
Choose 7.10.0.
Name
Cluster name, which contains 4 to 32 characters. Only letters, numbers, hyphens (-), and underscores (_) are allowed and the value must start with a letter.
Logstash-ES is an example.
VPC
A VPC is a secure, isolated logical network environment.
Select the same VPC as the destination Elasticsearch cluster.
Subnet
A subnet provides dedicated network resources that are isolated from other networks, improving network security.
Select the destination subnet. You can access the VPC management console to view the names and IDs of the existing subnets in the VPC.
Security Group
A security group is a collection of access control rules for ECSs that have the same security protection requirements and are mutually trusted in a VPC.
Select the same security group as the destination Elasticsearch cluster.
- Click Next: Advanced Settings and retain the default settings.
- Click Next: Confirm Configuration. Confirm the settings, and click Create Now to create the cluster.
- Click Back to Cluster List to switch to the Clusters page. The cluster you created is now in the cluster list and its status is Creating. If the cluster is successfully created, its status changes to Available.
Verifying Connectivity Between Clusters
Before starting a migration task, verify the network connectivity between Logstash and the source Elasticsearch cluster.
- In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
- On the Configuration Center page, click Test Connectivity.
- In the dialog box that is displayed, enter the IP address and port number of the source cluster and click Test.
Figure 4 Testing connectivity
If Available is displayed, the network is connected. If the network is disconnected, configure routes for the Logstash cluster to connect the clusters. For details, see Configuring Routes for a Logstash Cluster.
Using Logstash to Perform Full Data Migration
At the initial stage of cluster migration, or in scenarios where guaranteeing data integrity takes top priority, it is recommended to use Logstash for full data migration. This approach migrates the entire Elasticsearch cluster's data in one go.
- Log in to the CSS management console.
- In the navigation pane on the left, choose .
- In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
- On the Configuration Center page, click Create in the upper right corner. On the Create Configuration File page, edit the configuration file for the full Elasticsearch cluster migration.
- Selecting a cluster template: Expand the system template list, select elasticsearch, and click Apply in the Operation column.
- Setting the name of the configuration file: Set Name, for example, es-es-all.
- Editing the configuration file: Enter the migration configuration plan of the Elasticsearch cluster in Configuration File Content. The following is an example of the configuration file: For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
input{ elasticsearch{ # Access address of the source Elasticsearch cluster. You do not need to add a protocol. If you add the HTTPS protocol, an error will be reported. hosts => ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"] # Username and password for accessing the source cluster. You do not need to configure them for a non-security-mode cluster. # user => "css_logstash" # password => "*****" # Configure the indexes to be migrated. Use commas (,) to separate multiple indexes. You can use wildcard characters, for example, index*. index => "*_202102" docinfo => true slices => 3 size => 3000 # If the destination cluster is accessed through HTTPS, you need to configure the following information: # HTTPS access certificate of the cluster. Retain the following for the CSS cluster: # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # Whether to enable HTTPS communication. Set this parameter to true for a cluster accessed through HTTPS. #ssl => true } } # Remove specified fields added by Logstash. filter { mutate { remove_field => ["@version"] } } output{ elasticsearch{ # Access address of the destination Elasticsearch cluster hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # Username and password for accessing the destination cluster. You do not need to configure them for a non-security-mode cluster. # user => "css_logstash" # password => "*****" # Configure the index of the target cluster. The following configuration indicates that the index name is the same as that of the source end. index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # If the destination cluster is accessed through HTTPS, you need to configure the following information: # HTTPS access certificate of the cluster. Retain the following for the CSS cluster: #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # Whether to enable HTTPS communication. Set this parameter to true for a cluster accessed through HTTPS. #ssl => true # Whether to verify the elasticsearch certificate on the server. Set this parameter to false, indicating that the certificate is not verified. #ssl_certificate_verification => false } }
Table 4 Configuration items for full migration Item
Description
input
hosts
IP address of the source cluster. If the cluster has multiple access nodes, separate them with commas (,).
user
Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.
password
Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.
index
The source indexes to be fully migrated. Use commas (,) to separate multiple indexes. Wildcard is supported, for example, index*.
docinfo
Indicates whether to re-index the document. The value must be true.
slices
In some cases, it is possible to improve overall throughput by consuming multiple distinct slices of a query simultaneously using sliced scrolls. It is recommended that the value range from 2 to 8.
size
Maximum number of hits returned for each query
output
hosts
Access address of the destination cluster. If the cluster has multiple nodes, separate them with commas (,).
user
Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.
password
Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.
index
Name of the index migrated to the destination cluster. It can be modified and expanded, for example, Logstash-%{+yyyy.MM.dd}.
document_type
Ensure that the document type on the destination end is the same as that on the source end.
document_id
Document ID in the index. It is advisable to keep consistent document IDs on the source and destination clusters. If you want to have document IDs automatically generated, use the number sign (#) to comment it out.
- Click Next to configure Logstash pipeline parameters.
In this example, retain the default values. For details about how to set the parameters, see .
- Click OK.
On the Configuration Center page, you can check the created configuration file. If its status changes to Available, it has been successfully created.
- Execute the full migration task.
- In the configuration file list, select configuration file es-es-all and click Start in the upper left corner.
- In the Start Logstash dialog box, select Keepalive if necessary. In this example, Keepalive is not enabled.
When Keepalive is enabled, a daemon process will be configured on each node. If the Logstash service becomes faulty, the daemon process will try to rectify the fault and restart the service. You are advised to enable Keepalive for services running long-term. Do not enable it for services running only short-term, or your migration tasks may fail due to a lack of source data.
- Click OK to start the configuration file and hence the Logstash full migration task.
You can view the started configuration file in the pipeline list.
- After data migration is complete, check data consistency.
- Method 1: Use PuTTY to log in to the VM used for the migration and run the python checkIndices.py command to compare the data.
- Method 2: Run the GET _cat/indices command on the Kibana console of the source and destination clusters, separately, to check whether their indexes are consistent.
Using Logstash to Incrementally Migrate Cluster Data
In scenarios where continuous data synchronization or real-time data is required, it is recommended to use Logstash incremental cluster data migration. This method involves configuring incremental queries in Logstash, allowing only index data with incremental fields to be migrated.
- Log in to the CSS management console.
- In the navigation pane on the left, choose .
- In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click Configuration Center in the Operation column.
- On the Configuration Center page, click Create in the upper right corner. On the Create Configuration File page, edit the configuration file for the incremental migration.
- Selecting a cluster template: Expand the system template list, select elasticsearch, and click Apply in the Operation column.
- Setting the name of the configuration file: Set Name, for example, es-es-inc.
- Editing the configuration file: Enter the migration configuration plan of the Elasticsearch cluster in Configuration File Content. The following is an example of the configuration file:
The incremental migration configuration varies according to the index and must be provided based on the index analysis. For details about how to obtain the cluster information, see Obtaining Elasticsearch Cluster Information.
input{ elasticsearch{ # Access address of the source Elasticsearch cluster. You do not need to add a protocol. If you add the HTTPS protocol, an error will be reported. hosts => ["xx.xx.xx.xx:9200"] # Username and password for accessing the source cluster. You do not need to configure them for a non-security-mode cluster. user => "css_logstash" password => "******" # Configure incremental migration indexes. index => "*_202102" # Configure incremental migration query statements. query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}' docinfo => true size => 1000 # If the destination cluster is accessed through HTTPS, you need to configure the following information: # HTTPS access certificate of the cluster. Retain the following for the CSS cluster: # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # Whether to enable HTTPS communication. Set this parameter to true for HTTPS-based cluster access. #ssl => true } } filter { mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # Access address of the destination cluster. hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # Username and password for accessing the destination cluster. You do not need to configure them for a non-security-mode cluster. #user => "admin" #password => "******" # Configure the index of the target cluster. The following configuration indicates that the index name is the same as that of the source end. index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # If the destination cluster is accessed through HTTPS, you need to configure the following information: # HTTPS access certificate of the cluster. Retain the default value for the CSS cluster. #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # Whether to enable HTTPS communication. Set this parameter to true for HTTPS-based cluster access. #ssl => true # Whether to verify the elasticsearch certificate on the server. Set this parameter to false, indicating that the certificate is not verified. #ssl_certificate_verification => false } #stdout { codec => rubydebug { metadata => true }} }
Table 5 Incremental migration configuration items Configuration
Description
hosts
Access addresses of the source and target clusters. If a cluster has multiple nodes, enter all their access addresses.
user
Username for accessing the cluster. For a non-security-mode cluster, use # to comment out this parameter.
password
Password for accessing the cluster. For a non-security-mode cluster, use # to comment out this item.
index
Indexes to be incrementally migrated. One configuration file supports the incremental migration of only one index.
query
Identifier of incremental data. Typically, it is a DLS statement of Elasticsearch and needs to be analyzed in advance. postsDate indicates the time field in the service.
{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}
This command means to migrate data added after 2021-05-25. During multiple incremental migrations, you need to change the log value. If the indexes in the source end Elasticsearch use the timestamp format, convert the data to a timestamp here. The validity of this command must be verified in advance.
scroll
If there is massive data on the source end, you can use the scroll function to obtain data in batches to prevent Logstash memory overflow. The default value is 1m. The interval cannot be too long. Otherwise, data may be lost.
- Execute the incremental migration task.
- In the configuration file list, select configuration file es-es-inc and click Start in the upper left corner.
- In the Start Logstash dialog box, select Keepalive if necessary. In this example, Keepalive is not enabled.
When Keepalive is enabled, a daemon process will be configured on each node. If the Logstash service becomes faulty, the daemon process will try to rectify the fault and restart the service. You are advised to enable Keepalive for services running long-term. Do not enable it for services running only short-term, or your migration tasks may fail due to a lack of source data.
- Click OK to start the configuration file and hence the Logstash incremental migration task.
You can view the started configuration file in the pipeline list.
- After data migration is complete, check data consistency.
- Method 1: Use PuTTY to log in to the VM used for the migration and run the python checkIndices.py command to compare the data.
- Method 2: Run the GET _cat/indices command on the Kibana console of the source and destination clusters, separately, to check whether their indexes are consistent.
Deleting a Logstash Cluster
After the migration is complete, release the Logstash cluster in a timely manner to save resources and avoid unnecessary fees.
- Log in to the CSS management console.
- In the navigation pane on the left, choose .
- In the Logstash cluster list, select the created Logstash cluster Logstash-ES and click More > Delete in the Operation column. In the confirmation dialog box, manually type in DELETE, and click OK.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot