使用云服务Logstash增量迁移集群数据
Logstash支持全量迁移和增量迁移,首次迁移使用全量迁移,后续增加数据选择增量迁移。本章节介绍如何使用CSS的Logstash增量迁移集群数据,增量迁移需要索引有时间戳标志,用于识别增量数据。
首先请根据约束和限制和准备工作提前完成迁移前的准备工作。具体步骤如下所示:
准备工作
- 创建迁移虚拟机。
- 创建迁移虚拟机,用于迁移源集群的元数据。
- 创建ECS虚拟机,虚拟机需要创建linux系统,规格选择2U4G。
- 测试虚拟机和源集群和目标集群保持连通性,执行命令curl http://{ip}:{port}可以测试结果。
IP是源集群和目的集群访问地址,端口默认是9200,如果不是9200使用集群实际端口。
如下示例仅适用于非安全集群。
curl http://10.234.73.128:9200 { "name" : "voc_es_cluster_new-ess-esn-1-1", "cluster_name" : "voc_es_cluster_new", "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "Tagline" : "You Know, for Search" }
- 创建迁移虚拟机,用于迁移源集群的元数据。
- 准备工具和软件
在线安装和离线安装以虚拟机是否可以联网判断。如果可以联网直接使用yum和pip安装即可。离线安装需要下载安装包到虚拟机上执行安装命令。
在线安装步骤如下:
- 执行yum install python2安装python2。
[root@ecs opt]# yum install python2
- 执行yum install python-pip安装pip。
[root@ecs opt]# yum install python-pip
- 执行pip install pyyaml安装yaml依赖。
- 执行pip install requests安装requests依赖。
离线安装步骤如下:
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
图1 下载python2安装包
- 使用winscp工具上传python安装包到opt目录下,安装python。
# 解压python压缩包 [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz Python-2.7.18/Modules/zlib/crc32.c Python-2.7.18/Modules/zlib/gzlib.c Python-2.7.18/Modules/zlib/inffast.c Python-2.7.18/Modules/zlib/example.c Python-2.7.18/Modules/python.c Python-2.7.18/Modules/nismodule.c Python-2.7.18/Modules/Setup.config.in … # 解压完成进入目录 [root@ecs-52bc opt]# cd Python-2.7.18 # 检查文件配置安装路径 [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2 … checking for build directories... checking for --with-computed-gotos... no value specified checking whether gcc -pthread supports computed gotos... yes done checking for ensurepip... no configure: creating ./config.status config.status: creating Makefile.pre config.status: creating Modules/Setup.config config.status: creating Misc/python.pc config.status: creating Modules/ld_so_aix config.status: creating pyconfig.h creating Modules/Setup creating Modules/Setup.local creating Makefile # 编译python [root@ecs-52bc Python-2.7.18]# make # 安装python [root@ecs-52bc Python-2.7.18]# make install
- 安装完成检查,检查python安装结果。
# 检查python版本 [root@ecs-52bc Python-2.7.18]# python --version Python 2.7.5 # 检查pip版本 [root@ecs-52bc Python-2.7.18]# pip --version pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7) [root@ecs-52bc Python-2.7.18]#
- 执行yum install python2安装python2。
- 准备执行脚本
- vi migrateConfig.yaml增加配置文件。
es_cluster_new: # 集群名称 clustername: es_cluster_new # 源端ES集群地址,加上http://, src_ip: http://x.x.x.x:9200 # 没有用户名密码设置为"" src_username: "" src_password: "" # 目的端ES集群地址,加上http:// dest_ip: http://x.x.x.x:9200 # 没有用户名密码设置为"" dest_username: "" dest_password: "" # 可有不定义,默认为false, migrateMapping.py使用 # 是否只处理这个文件中mapping地址的索引 # 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建 # 如果设置成false,则会取源端集群的所有索引,除去(.kibana, .*) # 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称 # 如果匹配不到,则使用源端原始的索引名称 only_mapping: false # 要迁移的索引,key为源端的索引名字,value为目的端的索引名字 mapping: test_index_1: test_index_1 # 可以不定义,默认false, checkIndices.py使用 # 设置为false会比较所有的索引和文档数量,设置为true只比较索引数量, only_compare_index: false
配置文件说明:
配置
说明
clustername
集群名称,集群标识。
src_ip
源集群访问的ip地址,只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。
src_username
源集群访问的用户名,如果不需要设置为""。
src_password
源集群访问的密码,如果不需要设置为""。
dest_ip
目标集群访问的ip地址, 只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。
dest_username
目标集群访问的用户名,如果不需要设置为""。
dest_password
目标集群访问的密码,如果不需要设置为""。
- 执行vi checkIndices.py命令,复制以下脚本到文件生成索引数据对比脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True # get all indices def get_indices(url, source_auth): response = requests.get(url + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return create_index def get_mapping(url, _auth, index): source_url = url + "/" + index index_response = requests.get(source_url, auth=_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return "[failure] --- index is not exist in destination es. ---" mapping = index_response.json() return mapping def get_index_total(url, index, es_auth): stats_url = url + "/" + index + "/_stats" index_response = requests.get(stats_url, auth=es_auth, verify=False) if index_response.status_code != 200: print("*** get ElasticSearch stats message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return 0 return index_response.json() def get_indices_stats(url, es_auth): endpoint = url + "/_cat/indices" indicesResult = requests.get(endpoint, es_auth) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: indexList.append(indices.split()[2]) return indexList def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # python3 # return yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) cluster_name = name if "clustername" in value: cluster_name = value["clustername"] print("start to process cluster :" + cluster_name) # get all indices all_source_index = get_indices(source, source_auth) all_dest_index = get_indices(dest, dest_auth) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") filename = "mappingLogs/" + str(cluster_name) + "#indices_stats" with open(filename + ".json", "w") as f: json.dump("cluster name: " + cluster_name, f) f.write("\n") json.dump("source indices: ", f) f.write("\n") json.dump(all_source_index, f, indent=4) f.write("\n") json.dump("destination indices : ", f) f.write("\n") json.dump(all_dest_index, f, indent=4) f.write("\n") print("source indices total : " + str(all_source_index.__len__())) print("destination index total : " + str(all_dest_index.__len__())) filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping" filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping" with open(filename_src + ".json", "a") as f_src: json.dump("cluster name: " + cluster_name, f_src) f_src.write("\n") with open(filename_dest + ".json", "a") as f_dest: json.dump("cluster name: " + cluster_name, f_dest) f_dest.write("\n") for index in all_source_index: mapping = get_mapping(source, source_auth, index) with open(filename + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") with open(filename_src + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") mapping = get_mapping(dest, dest_auth, index) with open(filename + ".json", "a") as f_dest: json.dump("========================", f_dest) f_dest.write("\n") json.dump(mapping, f_dest, indent=4) f_dest.write("\n") with open(filename_dest + ".json", "a") as f_src: json.dump("========================", f_src) f_src.write("\n") json.dump(mapping, f_src, indent=4) f_src.write("\n") print("source indices write file success, file: " + filename_src) print("destination indices write file success, file: " + filename_dest) if "only_compare_index" in value and value["only_compare_index"]: print("[success] only compare mapping, not compare index count.") continue for index in all_source_index: index_total = get_index_total(value["src_ip"], index, source_auth) src_total = index_total["_all"]["primaries"]["docs"]["count"] src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024 dest_index = get_index_total(value["dest_ip"], index, dest_auth) if dest_index is 0: print('[failure] not found, index: %-20s, source total: %-10s size %6sM' % (str(index), str(src_total), src_size)) continue dest_total = dest_index["_all"]["primaries"]["docs"]["count"] if src_total != dest_total: print('[failure] not consistent, ' 'index: %-20s, source total: %-10s size %6sM destination total: %-10s ' % (str(index), str(src_total), src_size, str(dest_total))) continue print('[success] compare index total equal : index : %-20s, total: %-20s ' % (str(index), str(dest_total))) if __name__ == '__main__': main(sys.argv)
- vi migrateConfig.yaml增加配置文件。
步骤一:创建Logstash集群
- 迁移数据使用Logstash,创建logstash服务需要费用,默认是按需收费,用户迁移完毕数据及时释放Logstash节省费用。
- 可以基于集群的索引不同创建多个Logstash集群分别配置不同的迁移任务。
- 登录云搜索服务管理控制台。
- 在“总览”或者“集群管理”页面,选择“Logstash”,进入Logsash类型集群管理页面。
- 单击“创建集群”,进入“创建集群”页面。
- 选择“当前区域”和“可用区”。
- 指定集群基本信息,选择“集群类型”和“集群版本”,并输入“集群名称”。
表1 基本参数说明 参数
说明
集群类型
选择“Logstash”。
集群版本
当前支持5.6.16、7.10.0。
对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。
集群名称
自定义的集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。
图2 基本信息配置
- 指定集群的主机规格相关参数。“节点数量”设置为“1”。“节点规格”选择“8U16G”,其余参数保持默认值。
图3 设置主机规格
- 设置集群的企业项目,保持默认值即可。
- 单击“下一步,网络配置”,设置集群的网络配置。
表2 参数说明 参数
说明
虚拟私有云
VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。
选择创建集群需要的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。
说明:此处您选择的VPC必须包含网段(CIDR),否则集群将无法创建成功。新建的VPC默认包含网段(CIDR)。
子网
通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。
选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。
安全组
安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。单击“查看安全组”可了解安全组详情。
说明:请确保安全组的“端口范围/ICMP类型”为“Any”或者包含端口9200的端口范围。
图4 设置网络规格
- 单击“下一步:高级配置”,高级配置可选择默认配置和自定义。此样例保持默认配置即可。
- 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
- 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为“可用”。
步骤二:验证集群连通性
验证Logstash和源集群、目标集群的连通性。
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,进入配置中心页面;或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,选择“连通性测试”。
- 输入源集群和目的集群的IP地址或域名和端口号,单击“测试”。
图5 连通性测试
步骤三:配置Logstash增量迁移任务
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 单击右上角“创建”,进入创建配置文件页面,选择集群模板,修改ES集群迁移配置文件。
当前案例选用的两个ES集群均未开启https。
- 选择集群模板:此样例是从Elasticsearch类型集群导入数据到Elasticsearch类型,集群模板选择“elasticsearch”,单击操作列的“应用”。基于不同的集群配置增加配置信息。
- 修改配置文件:“名称”填写配置名称,例如es-es-inc;“配置文件内容”填写ES集群迁移配置文件,配置文件示例如下。
input{ elasticsearch{ hosts => ["xx.xx.xx.xx:9200"] user => "css_logstash" password => "******" index => "*_202102" query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}' docinfo => true size => 1000 #scroll => "5m" } } filter { mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] user => "admin" password => "******" index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" } #stdout { codec => rubydebug { metadata => true }} }
需要修改的配置如下:
表3 集群配置修改说明 配置
说明
hosts
分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。
user
集群访问的用户名,如果没有用户名此项使用#注释掉。
password
集群访问的密码,如果没有用户名密码此项使用#注释掉。
index
需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。
query
增量数据的识别标识,一般是es的dls语句,需要提前分析。其中postsDate为业务中时间字段。
{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}
此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。
scroll
当源端数据量过大,为了防止logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。
不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。
步骤四:增量迁移
- 使用putty登录准备工作中创建的Linux虚拟机。
- 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
- 选择步骤三:配置Logstash增量迁移任务中所创建的配置文件,单击左上角的“启动”。
- 根据界面提示,选择“是”启动Logstash服务会立刻开始迁移数据。
- 此时可以在管道下面看到启动的配置文件。
- 数据迁移完毕检查数据一致性,使用putty登录linux虚拟机,执行python checkIndices.py 对比数据结果。