使用云服务Logstash全量迁移集群数据
Logstash支持全量迁移和增量迁移,首次迁移使用全量迁移,后续增加数据选择增量迁移。本章节介绍如何使用CSS服务的Logstash全量迁移集群数据。
首先请根据约束和限制和准备工作提前完成迁移前的准备工作。具体步骤如下所示:
准备工作
- 创建迁移虚拟机。
创建迁移虚拟机,用于迁移源集群的元数据。
- 创建ECS虚拟机,虚拟机需要创建linux系统,规格选择2U4G。
- 测试虚拟机和源集群和目标集群保持连通性,执行命令curl http:// {ip}:{port}可以测试结果。
IP是源集群和目的集群访问地址,端口默认是9200,如果不是9200使用集群实际端口。
如下示例仅适用于非安全集群。
curl http://10.234.73.128:9200 { "name" : "voc_es_cluster_new-ess-esn-1-1", "cluster_name" : "voc_es_cluster_new", "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "Tagline" : "You Know, for Search" }
- 准备工具和软件
在线安装和离线安装以虚拟机是否可以联网判断。如果可以联网直接使用yum和pip安装即可。离线安装需要下载安装包到虚拟机上执行安装命令。
表1 准备工具和软件 类型
目的
获取位置
Python2
主要用户执行迁移脚本。
下载地址,版本选择python 2.7.18。
winscp
Linux上传脚本,跨平台文件传输工具。
在线安装步骤如下:
- 执行yum install python2安装python2。
[root@ecs opt]# yum install python2
- 执行yum install python-pip安装pip。
[root@ecs opt]# yum install python-pip
- 执行pip install pyyaml安装yaml依赖。
- 执行pip install requests安装requests依赖。
离线安装步骤如下:
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
图1 下载python2安装包
- 使用winscp工具上传python安装包到opt目录下,安装python。
# 解压python压缩包 [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz Python-2.7.18/Modules/zlib/crc32.c Python-2.7.18/Modules/zlib/gzlib.c Python-2.7.18/Modules/zlib/inffast.c Python-2.7.18/Modules/zlib/example.c Python-2.7.18/Modules/python.c Python-2.7.18/Modules/nismodule.c Python-2.7.18/Modules/Setup.config.in … # 解压完成进入目录 [root@ecs-52bc opt]# cd Python-2.7.18 # 检查文件配置安装路径 [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2 … checking for build directories... checking for --with-computed-gotos... no value specified checking whether gcc -pthread supports computed gotos... yes done checking for ensurepip... no configure: creating ./config.status config.status: creating Makefile.pre config.status: creating Modules/Setup.config config.status: creating Misc/python.pc config.status: creating Modules/ld_so_aix config.status: creating pyconfig.h creating Modules/Setup creating Modules/Setup.local creating Makefile # 编译python [root@ecs-52bc Python-2.7.18]# make # 安装python [root@ecs-52bc Python-2.7.18]# make install
- 安装完成检查,检查python安装结果。
# 检查python版本 [root@ecs-52bc Python-2.7.18]# python --version Python 2.7.5 # 检查pip版本 [root@ecs-52bc Python-2.7.18]# pip --version pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7) [root@ecs-52bc Python-2.7.18]#
- 执行yum install python2安装python2。
- 准备执行脚本
- vi migrateConfig.yaml增加配置文件。
es_cluster_new: # 集群名称 clustername: es_cluster_new # 源端ES集群地址,加上http://, src_ip: http://x.x.x.x:9200 # 没有用户名密码设置为"" src_username: "" src_password: "" # 目的端ES集群地址,加上http:// dest_ip: http://x.x.x.x:9200 # 没有用户名密码设置为"" dest_username: "" dest_password: "" # 可有不定义,默认为false, migrateMapping.py使用 # 是否只处理这个文件中mapping地址的索引 # 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建 # 如果设置成false,则会取源端集群的所有索引,除去(.kibana, .*) # 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称 # 如果匹配不到,则使用源端原始的索引名称 only_mapping: false # 要迁移的索引,key为源端的索引名字,value为目的端的索引名字 mapping: test_index_1: test_index_1 # 可以不定义,默认false, checkIndices.py使用 # 设置为false会比较所有的索引和文档数量,设置为true只比较索引数量, only_compare_index: false
配置文件说明:
表2 配置文件说明 配置
说明
clustername
集群名称,集群标识。
src_ip
源集群访问的ip地址,只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。
src_username
源集群访问的用户名,如果不需要设置为""。
src_password
源集群访问的密码,如果不需要设置为""。
dest_ip
目标集群访问的ip地址, 只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。
dest_username
目标集群访问的用户名,如果不需要设置为""。
dest_password
目标集群访问的密码,如果不需要设置为""。
- 执行vi migrateTemplate.py命令,复制以下脚本到文件生成索引结构迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def put_template_to_target(url, template, cluster, template_name, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False) if not os.path.exists("templateLogs"): os.makedirs("templateLogs") if create_resp.status_code != 200: print( "create template " + url + " failed with response: " + str( create_resp) + ", source template is " + template_name) print(create_resp.text) filename = "templateLogs/" + str(cluster) + "#" + template_name with open(filename + ".json", "w") as f: json.dump(template, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migration template!") config = loadConfig(argv) src_clusters = config.keys() print("process cluster name:") for name in src_clusters: print(name) print("cluster total number:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster name:" + name) source_url = value["src_ip"] + "/_template" response = requests.get(source_url, auth=source_auth, verify=False) if response.status_code != 200: print("*** get all template failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue all_template = response.json() migrate_itemplate = [] for template in all_template.keys(): if template.startswith(".") or template == "logstash": continue if "index_patterns" in all_template[template]: for t in all_template[template]["index_patterns"]: # if "kibana" in template: if t.startswith("."): continue migrate_itemplate.append(template) for template in migrate_itemplate: dest_index_url = value["dest_ip"] + "/_template/" + template result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth) if result is True: print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template))) else: print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template))) if __name__ == '__main__': main(sys.argv)
- 执行vi migrateMapping.py命令,复制以下脚本到文件生成索引结构迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True def process_mapping(index_mapping, dest_index): # remove unnecessary keys del index_mapping["settings"]["index"]["provided_name"] del index_mapping["settings"]["index"]["uuid"] del index_mapping["settings"]["index"]["creation_date"] del index_mapping["settings"]["index"]["version"] if "lifecycle" in index_mapping["settings"]["index"]: del index_mapping["settings"]["index"]["lifecycle"] # check alias aliases = index_mapping["aliases"] for alias in list(aliases.keys()): if alias == dest_index: print( "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.") del index_mapping["aliases"][alias] # if index_mapping["settings"]["index"].has_key("lifecycle"): if "lifecycle" in index_mapping["settings"]["index"]: lifecycle = index_mapping["settings"]["index"]["lifecycle"] opendistro = {"opendistro": {"index_state_management": {"policy_id": lifecycle["name"], "rollover_alias": lifecycle["rollover_alias"]}}} index_mapping["settings"].update(opendistro) # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"] del index_mapping["settings"]["index"]["lifecycle"] # replace synonyms_path if "analysis" in index_mapping["settings"]["index"]: analysis = index_mapping["settings"]["index"]["analysis"] if "filter" in analysis: filter = analysis["filter"] if "my_synonym_filter" in filter: my_synonym_filter = filter["my_synonym_filter"] if "synonyms_path" in my_synonym_filter: index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][ "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt" return index_mapping def getAlias(source, source_auth): # get all indices response = requests.get(source + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return system_index, create_index def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") if create_resp.status_code != 200: print( "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + str(source_index)) print(create_resp.text) filename = "mappingLogs/" + str(cluster) + "#" + str(source_index) with open(filename + ".json", "w") as f: json.dump(mapping, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster: " + name) # only deal with mapping list if 'only_mapping' in value and value["only_mapping"]: for source_index, dest_index in value["mapping"].iteritems(): print("start to process source index" + source_index + ", target index: " + dest_index) source_url = source + "/" + source_index response = requests.get(source_url, auth=source_auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue mapping = response.json() index_mapping = process_mapping(mapping[source_index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth) if result is False: print("cluster name:" + name + ", " + source_index + ":failure") continue print("cluster name:" + name + ", " + source_index + ":success") else: # get all indices system_index, create_index = getAlias(source, source_auth) success_index = 0 for index in create_index: source_url = source + "/" + index index_response = requests.get(source_url, auth=source_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) continue mapping = index_response.json() dest_index = index if 'mapping' in value: if index in value["mapping"].keys(): dest_index = value["mapping"][index] index_mapping = process_mapping(mapping[index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth) if result is False: print("[failure]: migrate mapping cluster name: " + name + ", " + index) continue print("[success]: migrate mapping cluster name: " + name + ", " + index) success_index = success_index + 1 print("create index mapping success total: " + str(success_index)) if __name__ == '__main__': main(sys.argv)
- 执行vi checkIndices.py命令,复制以下脚本到文件生成索引数据对比脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True # get all indices def get_indices(url, source_auth): response = requests.get(url + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return create_index def get_mapping(url, _auth, index): source_url = url + "/" + index index_response = requests.get(source_url, auth=_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return "[failure] --- index is not exist in destination es. ---" mapping = index_response.json() return mapping def get_index_total(url, index, es_auth): stats_url = url + "/" + index + "/_stats" index_response = requests.get(stats_url, auth=es_auth, verify=False) if index_response.status_code != 200: print("*** get ElasticSearch stats message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return 0 return index_response.json() def get_indices_stats(url, es_auth): endpoint = url + "/_cat/indices" indicesResult = requests.get(endpoint, es_auth) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: indexList.append(indices.split()[2]) return indexList def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # python3 # return yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) cluster_name = name if "clustername" in value: cluster_name = value["clustername"] print("start to process cluster :" + cluster_name) # get all indices all_source_index = get_indices(source, source_auth) all_dest_index = get_indices(dest, dest_auth) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") filename = "mappingLogs/" + str(cluster_name) + "#indices_stats" with open(filename + ".json", "w") as f: json.dump("cluster name: " + cluster_name, f) f.write("\n") json.dump("source indices: ", f) f.write("\n") json.dump(all_source_index, f, indent=4) f.write("\n") json.dump("destination indices : ", f) f.write("\n") json.dump(all_dest_index, f, indent=4) f.write("\n") print("source indices total : " + str(all_source_index.__len__())) print("destination index total : " + str(all_dest_index.__len__())) filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping" filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping" with open(filename_src + ".json", "a") as f_src: json.dump("cluster name: " + cluster_name, f_src) f_src.write("\n") with open(filename_dest + ".json", "a") as f_dest: json.dump("cluster name: " + cluster_name, f_dest) f_dest.write("\n") for index in all_source_index: mapping = get_mapping(source, source_auth, index) with open(filename + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") with open(filename_src + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") mapping = get_mapping(dest, dest_auth, index) with open(filename + ".json", "a") as f_dest: json.dump("========================", f_dest) f_dest.write("\n") json.dump(mapping, f_dest, indent=4) f_dest.write("\n") with open(filename_dest + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") print("source indices write file success, file: " + filename_src) print("destination indices write file success, file: " + filename_dest) if "only_compare_index" in value and value["only_compare_index"]: print("[success] only compare mapping, not compare index count.") continue for index in all_source_index: index_total = get_index_total(value["src_ip"], index, source_auth) src_total = index_total["_all"]["primaries"]["docs"]["count"] src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024 dest_index = get_index_total(value["dest_ip"], index, dest_auth) if dest_index is 0: print('[failure] not found, index: %-20s, source total: %-10s size %6sM' % (str(index), str(src_total), src_size)) continue dest_total = dest_index["_all"]["primaries"]["docs"]["count"] if src_total != dest_total: print('[failure] not consistent, ' 'index: %-20s, source total: %-10s size %6sM destination total: %-10s ' % (str(index), str(src_total), src_size, str(dest_total))) continue print('[success] compare index total equal : index : %-20s, total: %-20s ' % (str(index), str(dest_total))) if __name__ == '__main__': main(sys.argv)
- vi migrateConfig.yaml增加配置文件。
步骤一:创建Logstash集群
- 迁移数据使用Logstash,创建logstash服务需要费用,默认是按需收费,用户迁移完毕数据及时释放Logstash节省费用。
- 可以基于集群的索引不同创建多个Logstash集群分别配置不同的迁移任务。
- 登录云搜索服务管理控制台。
- 在“总览”或者“集群管理”页面,选择“Logstash”,进入Logsash类型集群管理页面。
- 单击“创建集群”,进入“创建集群”页面。
- 选择“当前区域”和“可用区”。
- 指定集群基本信息,选择“集群类型”和“集群版本”,并输入“集群名称”。
表3 基本参数说明 参数
说明
集群类型
选择“Logstash”。
集群版本
当前支持5.6.16、7.10.0。
对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。
集群名称
自定义的集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。
图2 基本信息配置
- 指定集群的主机规格相关参数。“节点数量”设置为“1”。“节点规格”选择“8U16G”,其余参数保持默认值。
图3 设置主机规格
- 设置集群的企业项目,保持默认值即可。
- 单击“下一步,网络配置”,设置集群的网络配置。
表4 参数说明 参数
说明
虚拟私有云
VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。
选择创建集群需要的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。
说明:此处您选择的VPC必须包含网段(CIDR),否则集群将无法创建成功。新建的VPC默认包含网段(CIDR)。
子网
通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。
选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。
安全组
安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。单击“查看安全组”可了解安全组详情。
说明:请确保安全组的“端口范围/ICMP类型”为“Any”或者包含端口9200的端口范围。
图4 设置网络规格
- 单击“下一步:高级配置”,高级配置可选择默认配置和自定义。此样例保持默认配置即可。
- 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
- 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为“可用”。
步骤二:验证集群连通性
验证Logstash和源集群、目标集群的连通性。
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,进入配置中心页面;或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,选择“连通性测试”。
- 输入源集群和目的集群的IP地址或域名和端口号,单击“测试”。
图5 连通性测试
步骤三:配置Logstash全量迁移任务
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 单击右上角“创建”,进入创建配置文件页面,选择集群模板,修改ES集群迁移配置文件。
当前案例选用的两个ES集群均未开启https。
- 选择集群模板:此样例是从Elasticsearch类型集群导入数据到Elasticsearch类型,集群模板选择“elasticsearch”,单击操作列的“应用”。基于不同的集群配置增加配置信息。
- 修改配置文件:“名称”填写配置名称,例如es-es-all;“配置文件内容”填写ES集群迁移配置文件,配置文件示例如下。
input{ elasticsearch{ # 源集群访问地址信息 hosts => ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"] # 源集群访问的用户名和密码,没有user和password不填 # user => "css_logstash" # password => "*****" # 待迁移的索引信息 index => "*_202102" docinfo => true slices => 3 size => 3000 } } # 移除一些logstash增加的字段 filter { mutate { remove_field => ["@metadata", "@version"] } } output{ elasticsearch{ # 目标集群访问地址信息 hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # 目标集群访问的用户名和密码,没有user和password不填 # user => "css_logstash" # password => "*****" index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" } }
需要修改的配置如下:
表5 集群配置修改说明 配置
说明
input
hosts
源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。
user
集群访问的用户名,如果没有用户名此项使用#注释掉。
password
集群访问的密码,如果没有用户名密码此项使用#注释掉。
index
需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,index*。
docinfo
是否重新索引文档,必须为true。
slices
配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。
size
每次查询返回的最大命中数。
output
hosts
华为云CSS集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。
user
集群访问的用户名,如果没有用户名此项使用 # 注释掉。
password
集群访问的密码,如果没有密码此项使用 # 注释掉。
index
迁移到目的端的索引名称,支持扩展修改,如:logstash-%{+yyyy.MM.dd}。
document_type
使目标端文档类型与源端保持一致。
document_id
索引中的文档ID,建议与源端保持一致,如需要自动生成,使用 #注释掉即可。
步骤四:全量迁移
- 使用putty登录准备工作中创建的Linux虚拟机。
- 执行python migrateTemplate.py 迁移索引模板。
- 执行 python migrateMapping.py迁移索引。
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 选择步骤三:配置Logstash全量迁移任务中所创建的配置文件,单击左上角的“启动”。
- 根据界面提示,选择是否启动Logstash服务会立刻开始迁移数据。
- 如果选择“是”,则可以在管道下面看到启动的配置文件。
- 数据迁移完毕检查数据一致性,使用putty登录linux虚拟机,执行python checkIndices.py 对比数据结果。