通过华为云Logstash实现Elasticsearch集群间数据迁移
使用华为云CSS服务的Logstash集群可以实现Elasticsearch集群间的数据迁移。
应用场景
华为云Logstash是一款全托管的数据接入处理服务,兼容开源Logstash的能力,支持用于Elasticsearch集群间数据迁移。
通过华为云Logstash可以实现华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch迁移至华为云Elasticsearch,该方案常用于以下场景:
- 跨版本迁移:利用Logstash的兼容性和灵活性,实现不同版本间的数据迁移,确保数据在新版本中的可用性和一致性。适用于Elasticsearch集群版本跨度较大的迁移场景,例如从6.X版本迁移至7.X版本。
- 集群合并:使用Logstash进行数据迁移,将多个Elasticsearch集群的数据整合到一个Elasticsearch集群中,实现多个Elasticsearch数据的统一管理和分析。
- 服务迁移上云:将自建的Elasticsearch服务迁移到云平台,以利用云服务的可扩展性、维护简便性和成本效益。
- 变更服务提供商:如果企业当前使用的是第三方Elasticsearch服务,但出于成本、性能或其他战略考虑,希望更换服务提供商至华为云。
方案架构
通过华为云Logstash实现Elasticsearch集群间数据迁移的迁移流程如图1所示。
- 输入(Input):华为云Logstash接收来自华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch的数据。
华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch数据迁移到华为云Elasticsearch的操作步骤相同,只是获取源集群的访问地址有差异,具体请参见获取Elasticsearch集群信息。
- 过滤(Filter):华为云Logstash对数据进行清洗和转换。
- 输出(Output):华为云Logstash将数据输出到目标设备,如华为云Elasticsearch。
- 全量数据迁移:使用Logstash进行全量数据迁移,适用于迁移初期或需要确保数据完整性的场景。
- 增量数据迁移:通过Logstash配置增量查询,可以只迁移有增量字段的索引数据。此方法适用于需要持续同步数据或对数据实时性有较高要求的场景。
方案优势
- 高版本兼容性:适用于不同版本的Elasticsearch集群迁移。
- 高效的数据处理能力:Logstash支持批量读写操作,可以大幅度提高数据迁移的效率。
- 并发同步技术:利用slice并发同步技术,可以提高数据迁移的速度和性能,尤其是在处理大规模数据时。
- 配置简单:华为云Logstash的配置相对简单直观,通过配置文件即可实现数据的输入、处理和输出。
- 强大的数据处理功能:Logstash内置了丰富的过滤器,可以在迁移过程中对数据进行清洗、转换和丰富。
- 灵活的迁移策略:根据业务需求,可以灵活选择全量迁移或增量迁移,优化存储使用和迁移时间。
性能影响
- 对于资源消耗较高的集群,建议通过调整size参数来减缓迁移速率,或者选择在业务流量低谷时段进行迁移操作,以减轻对集群资源的影响。
- 对于资源消耗较低的集群,在迁移时可以采用默认参数配置,建议同时监控源集群的性能负载,并根据实际情况适时调整size和slice参数,以优化迁移效率和资源使用。
约束限制
集群迁移过程中,源集群的索引数据不能增删改,否则会导致迁移后的源集群数据和目标集群数据内容不一致。
前提条件
- 源Elasticsearch集群和目标Elasticsearch集群处于可用状态。
- 集群间需要保证网络连通。
- 如果源集群、Logstash和目标集群在不同VPC,则需要先打通VPC网络建立对等连接。具体操作请参见对等连接简介。
- 如果是自建Elasticsearch集群迁移至华为云,则可以通过给自建Elasticsearch集群配置公网访问打通网络。
- 如果是第三方Elasticsearch集群迁移至华为云,则需要建立企业内部数据中心到华为云的VPN通道或专线。
- 确认集群的索引已开启“_source”。
集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。
操作步骤
- 获取Elasticsearch集群信息
- 准备迁移环境:创建ECS并准备必要的迁移工具和脚本。
- 创建Logstash集群:创建一个Logstash集群用于迁移数据。
- 验证集群间的网络连通性:验证Logstash和源Elasticsearch集群的连通性。
- 使用Logstash迁移集群
- 在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据。
- 在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据。
- 释放Logstash集群:当集群迁移完成后,请及时释放Logstash集群。
获取Elasticsearch集群信息
在迁移集群前,需要先获取必备的集群信息,用于配置迁移任务。
集群来源 |
要获取的信息 |
获取方式 |
|
---|---|---|---|
源集群 |
华为云Elasticsearch集群 |
|
|
自建Elasticsearch集群 |
|
联系服务管理员获取。 |
|
第三方Elasticsearch集群 |
|
联系服务管理员获取。 |
|
目标集群 |
华为云Elasticsearch集群 |
|
|
源集群的来源不同,获取信息的方式不同,此处仅介绍如何获取华为云Elasticsearch集群的信息。
准备迁移环境
创建ECS并准备必要的迁移工具和脚本。
- 创建弹性云服务器ECS,用于迁移源集群的元数据。
- 创建弹性云服务器ECS,ECS的操作系统选择CentOS,规格选择2U4G,且和CSS服务的集群在同一个虚拟私有云和安全组中。
- 测试ECS和源集群、目标集群的连通性。
在ECS执行命令curl http:// {ip}:{port}测试连通性,当返回200时,则表示已经连通。
IP是源集群和目标集群访问地址;port是端口号,默认是9200,请以集群实际端口号为准。
curl http://10.234.73.128:9200 # 输入实际的IP地址,此处仅以非安全集群举例。 { "name" : "es_cluster_migrate-ess-esn-1-1", "cluster_name" : "es_cluster_migrate", "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "Tagline" : "You Know, for Search" }
- 准备工具和软件,判断ECS是否可以联网,选择不同的安装方式。
- 在线安装工具和软件。
- 执行yum install python2安装python2。
[root@ecs opt]# yum install python2
- 执行yum install python-pip安装pip。
[root@ecs opt]# yum install python-pip
- 执行pip install pyyaml安装yaml依赖。
- 执行pip install requests安装requests依赖。
- 执行yum install python2安装python2。
- 离线安装工具和软件。
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
图3 下载python2安装包
- 使用WinSCP工具上传Python安装包到opt目录下,安装python。
# 解压python压缩包 [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz Python-2.7.18/Modules/zlib/crc32.c Python-2.7.18/Modules/zlib/gzlib.c Python-2.7.18/Modules/zlib/inffast.c Python-2.7.18/Modules/zlib/example.c Python-2.7.18/Modules/python.c Python-2.7.18/Modules/nismodule.c Python-2.7.18/Modules/Setup.config.in … # 解压完成进入目录 [root@ecs-52bc opt]# cd Python-2.7.18 # 检查文件配置安装路径 [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2 … checking for build directories... checking for --with-computed-gotos... no value specified checking whether gcc -pthread supports computed gotos... yes done checking for ensurepip... no configure: creating ./config.status config.status: creating Makefile.pre config.status: creating Modules/Setup.config config.status: creating Misc/python.pc config.status: creating Modules/ld_so_aix config.status: creating pyconfig.h creating Modules/Setup creating Modules/Setup.local creating Makefile # 编译python [root@ecs-52bc Python-2.7.18]# make # 安装python [root@ecs-52bc Python-2.7.18]# make install
- 安装完成检查,检查python安装结果。
# 检查python版本 [root@ecs-52bc Python-2.7.18]# python --version Python 2.7.5 # 检查pip版本 [root@ecs-52bc Python-2.7.18]# pip --version pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7) [root@ecs-52bc Python-2.7.18]#
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
- 准备Elasticsearch集群的索引迁移脚本。
- 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息。
es_cluster_new: # 源集群的名称 clustername: es_cluster_new # 源Elasticsearch集群的访问地址,加上“http://”。 src_ip: http://x.x.x.x:9200 # 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。 src_username: "" src_password: "" # 目标Elasticsearch集群的访问地址,加上“http://”。 dest_ip: http://x.x.x.x:9200 # 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。 dest_username: "" dest_password: "" # “only_mapping”可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;当设置成false时,则迁移源集群中除“.kibana”和“.*”之外的所有索引数据。 # 迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。 only_mapping: false # 设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。 mapping: test_index_1: test_index_1 # “only_compare_index”可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。 only_compare_index: false
- 执行vi migrateTemplate.py命令,直接复制输入以下内容无需修改,执行wq保存为索引模板迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def put_template_to_target(url, template, cluster, template_name, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False) if not os.path.exists("templateLogs"): os.makedirs("templateLogs") if create_resp.status_code != 200: print( "create template " + url + " failed with response: " + str( create_resp) + ", source template is " + template_name) print(create_resp.text) filename = "templateLogs/" + str(cluster) + "#" + template_name with open(filename + ".json", "w") as f: json.dump(template, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migration template!") config = loadConfig(argv) src_clusters = config.keys() print("process cluster name:") for name in src_clusters: print(name) print("cluster total number:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster name:" + name) source_url = value["src_ip"] + "/_template" response = requests.get(source_url, auth=source_auth, verify=False) if response.status_code != 200: print("*** get all template failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue all_template = response.json() migrate_itemplate = [] for template in all_template.keys(): if template.startswith(".") or template == "logstash": continue if "index_patterns" in all_template[template]: for t in all_template[template]["index_patterns"]: # if "kibana" in template: if t.startswith("."): continue migrate_itemplate.append(template) for template in migrate_itemplate: dest_index_url = value["dest_ip"] + "/_template/" + template result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth) if result is True: print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template))) else: print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template))) if __name__ == '__main__': main(sys.argv)
- 执行vi migrateMapping.py命令,直接复制输入以下内容无需修改,执行wq保存为索引结构迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True def process_mapping(index_mapping, dest_index): # remove unnecessary keys del index_mapping["settings"]["index"]["provided_name"] del index_mapping["settings"]["index"]["uuid"] del index_mapping["settings"]["index"]["creation_date"] del index_mapping["settings"]["index"]["version"] if "lifecycle" in index_mapping["settings"]["index"]: del index_mapping["settings"]["index"]["lifecycle"] # check alias aliases = index_mapping["aliases"] for alias in list(aliases.keys()): if alias == dest_index: print( "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.") del index_mapping["aliases"][alias] # if index_mapping["settings"]["index"].has_key("lifecycle"): if "lifecycle" in index_mapping["settings"]["index"]: lifecycle = index_mapping["settings"]["index"]["lifecycle"] opendistro = {"opendistro": {"index_state_management": {"policy_id": lifecycle["name"], "rollover_alias": lifecycle["rollover_alias"]}}} index_mapping["settings"].update(opendistro) # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"] del index_mapping["settings"]["index"]["lifecycle"] # replace synonyms_path if "analysis" in index_mapping["settings"]["index"]: analysis = index_mapping["settings"]["index"]["analysis"] if "filter" in analysis: filter = analysis["filter"] if "my_synonym_filter" in filter: my_synonym_filter = filter["my_synonym_filter"] if "synonyms_path" in my_synonym_filter: index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][ "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt" return index_mapping def getAlias(source, source_auth): # get all indices response = requests.get(source + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return system_index, create_index def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") if create_resp.status_code != 200: print( "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + str(source_index)) print(create_resp.text) filename = "mappingLogs/" + str(cluster) + "#" + str(source_index) with open(filename + ".json", "w") as f: json.dump(mapping, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster: " + name) # only deal with mapping list if 'only_mapping' in value and value["only_mapping"]: for source_index, dest_index in value["mapping"].iteritems(): print("start to process source index" + source_index + ", target index: " + dest_index) source_url = source + "/" + source_index response = requests.get(source_url, auth=source_auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue mapping = response.json() index_mapping = process_mapping(mapping[source_index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth) if result is False: print("cluster name:" + name + ", " + source_index + ":failure") continue print("cluster name:" + name + ", " + source_index + ":success") else: # get all indices system_index, create_index = getAlias(source, source_auth) success_index = 0 for index in create_index: source_url = source + "/" + index index_response = requests.get(source_url, auth=source_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) continue mapping = index_response.json() dest_index = index if 'mapping' in value: if index in value["mapping"].keys(): dest_index = value["mapping"][index] index_mapping = process_mapping(mapping[index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth) if result is False: print("[failure]: migrate mapping cluster name: " + name + ", " + index) continue print("[success]: migrate mapping cluster name: " + name + ", " + index) success_index = success_index + 1 print("create index mapping success total: " + str(success_index)) if __name__ == '__main__': main(sys.argv)
- 执行vi checkIndices.py命令,直接复制输入以下内容无需修改,执行wq保存为索引数据对比脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True # get all indices def get_indices(url, source_auth): response = requests.get(url + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return create_index def get_mapping(url, _auth, index): source_url = url + "/" + index index_response = requests.get(source_url, auth=_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return "[failure] --- index is not exist in destination es. ---" mapping = index_response.json() return mapping def get_index_total(url, index, es_auth): stats_url = url + "/" + index + "/_stats" index_response = requests.get(stats_url, auth=es_auth, verify=False) if index_response.status_code != 200: print("*** get ElasticSearch stats message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return 0 return index_response.json() def get_indices_stats(url, es_auth): endpoint = url + "/_cat/indices" indicesResult = requests.get(endpoint, es_auth) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: indexList.append(indices.split()[2]) return indexList def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # python3 # return yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) cluster_name = name if "clustername" in value: cluster_name = value["clustername"] print("start to process cluster :" + cluster_name) # get all indices all_source_index = get_indices(source, source_auth) all_dest_index = get_indices(dest, dest_auth) if "only_compare_index" in value and value["only_compare_index"]: print("[success] only compare mapping, not compare index count.") continue for index in all_source_index: index_total = get_index_total(value["src_ip"], index, source_auth) src_total = index_total["_all"]["primaries"]["docs"]["count"] src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024 dest_index = get_index_total(value["dest_ip"], index, dest_auth) if dest_index is 0: print('[failure] not found, index: %-20s, source total: %-10s size %6sM' % (str(index), str(src_total), src_size)) continue dest_total = dest_index["_all"]["primaries"]["docs"]["count"] if src_total != dest_total: print('[failure] not consistent, ' 'index: %-20s, source total: %-10s size %6sM destination total: %-10s ' % (str(index), str(src_total), src_size, str(dest_total))) continue print('[success] compare index total equal : index : %-20s, total: %-20s ' % (str(index), str(dest_total))) if __name__ == '__main__': main(sys.argv)
- 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息。
创建Logstash集群
当ECS中的迁移环境准备完成后,在CSS服务创建一个Logstash集群用于迁移数据。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择 。
- 单击右上角的“创建集群”,进入创建集群页面。
- 在创建集群页面,根据指导完成集群配置。
关键配置参数请参见表3,其他参数可以保持默认值。创建集群的详细说明请参见创建Logstash集群。
表3 Logstash集群的关键配置 参数
说明
计费模式
选择“按需计费”,按实际使用时长计费,计费周期为一小时,不足一小时按一小时计费。
集群类型
选择“Logstash”。
集群版本
选择“7.10.0”。
集群名称
自定义集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。
此处以“Logstash-ES”为例。
虚拟私有云
VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。
此处选择和目标Elasticsearch集群同一个虚拟私有云(VPC)。
子网
通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。
选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。
安全组
安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。
此处选择和目标Elasticsearch集群同一个安全组。
- 单击“下一步:高级配置”,此配置保持默认配置即可。
- 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
- 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为“可用”。
验证集群间的网络连通性
在启动迁移任务前,需要先验证Logstash和源Elasticsearch集群的网络连通性。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击“连通性测试”。
- 在弹窗中输入源集群的IP地址和端口号,单击“测试”。
图4 连通性测试
当显示“可用”时,表示集群间网络连通。
使用Logstash全量迁移集群数据
在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据,该方法会一次迁移整个Elasticsearch集群的数据。
- 使用Putty登录准备迁移环境中创建的Linux虚拟机。
- 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
- 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择 。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的全量迁移配置文件。
- 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”。
- 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-all”。
- 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。集群信息的获取方式请参见获取Elasticsearch集群信息。
input{ elasticsearch{ # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。 hosts => ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"] # 访问源集群的用户名和密码,非安全集群无需配置。 # user => "css_logstash" # password => "*****" # 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。 index => "*_202102" docinfo => true slices => 3 size => 3000 # 当源集群是HTTPS访问方式时,则设置ssl => false。 # ssl => false } } # 移除一些logstash增加的字段。 filter { mutate { remove_field => ["@metadata", "@version"] } } output{ elasticsearch{ # 目标Elasticsearch集群的访问地址 hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # 访问目标集群的用户名和密码,非安全集群无需配置。 # user => "css_logstash" # password => "*****" # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。 index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持以下不变。 #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs/CloudSearchService.cer" # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。 #ssl_certificate_verification => false } }
表4 全量迁移配置项说明 配置项名称
说明
input
hosts
源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,例如“index*”。
docinfo
是否重新索引文档,必须为true。
slices
配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。
size
每次查询返回的最大命中数。
output
hosts
目标集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
迁移到目标集群的索引名称,支持扩展修改,如“Logstash-%{+yyyy.MM.dd}”。
document_type
使目标端文档类型与源端保持一致。
document_id
索引中的文档ID,建议与源端保持一致,如需要自动生成,使用“#”注释掉即可。
- 编辑完成后,单击“下一页”配置Logstash配置文件运行参数。
此示例保持默认值即可,如需设置请参见创建Logstash配置文件。
- 配置完成后,单击“创建”。
- 启动全量迁移任务。
- 在配置文件列表,选择配置文件“es-es-all”,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。
开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。
- 单击“确定”,开始启动配置文件启动Logstash全量迁移任务。
可以在管道列表看到启动的配置文件。
- 数据迁移完毕检查数据一致性。
使用Logstash增量迁移集群数据
在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据,该方法通过Logstash配置增量查询,仅支持迁移有增量字段的索引数据。
- 使用Putty登录准备迁移环境中创建的Linux虚拟机。
- 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
- 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择 。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的增量迁移配置文件。
- 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”。
- 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-inc”。
- 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。
不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。集群信息的获取方式请参见获取Elasticsearch集群信息。
input{ elasticsearch{ # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。 hosts => ["xx.xx.xx.xx:9200"] # 访问源集群的用户名和密码,非安全集群无需配置。 user => "css_logstash" password => "******" # 配置增量迁移索引。 index => "*_202102" # 配置增量迁移查询语句。 query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}' docinfo => true size => 1000 # 当源集群是HTTPS访问方式时,则设置ssl => false。 # ssl => false } } filter { mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标集群的访问地址。 hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # 访问目标集群的用户名和密码,非安全集群无需配置。 #user => "admin" #password => "******" # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。 index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持默认。 #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs/CloudSearchService.cer" # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。 #ssl_certificate_verification => false } #stdout { codec => rubydebug { metadata => true }} }
表5 增量迁移配置项说明 配置
说明
hosts
分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。
query
增量数据的识别标识,一般是Elasticsearch的DLS语句,需要提前分析。其中postsDate为业务中的时间字段。
{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}
此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。
scroll
当源端数据量过大,为了防止Logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。
- 启动增量迁移任务。
- 在配置文件列表,选择配置文件“es-es-inc”,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。
开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。
- 单击“确定”,开始启动配置文件启动Logstash增量迁移任务。
可以在管道列表看到启动的配置文件。
- 数据迁移完毕检查数据一致性。
释放Logstash集群
当集群迁移完成后,请及时释放Logstash集群,可以节约资源,避免产生不必要的费用。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择 。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“更多 > 删除”,在弹出的确认提示框中,输入“DELETE”,单击“确定”完成集群删除。