文档首页> 云搜索服务 CSS> 最佳实践> 迁移集群> 源端为Elasticsearch> 使用云服务Logstash增量迁移集群数据
更新时间:2024-04-19 GMT+08:00

使用云服务Logstash增量迁移集群数据

Logstash支持全量迁移和增量迁移,首次迁移使用全量迁移,后续增加数据选择增量迁移。本章节介绍如何使用CSS的Logstash增量迁移集群数据,增量迁移需要索引有时间戳标志,用于识别增量数据。

首先请根据约束和限制准备工作提前完成迁移前的准备工作。具体步骤如下所示:

约束和限制

  • Logstash版本约束:

    CSS 支持5.5.1,6.3.2,6.5.4,7.1.1,7.6.2,7.10.2多个版本,迁移集群尽量保持大版本一致。

    对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。

  • 集群迁移过程禁止修改索引,修改索引会导致原数据和目标数据内容不一致。
  • 索引大小小于100G可以使用迁移任务不用单独分析索引,简化分析工作。

准备工作

  • 创建迁移虚拟机。
    1. 创建迁移虚拟机,用于迁移源集群的元数据。
      1. 创建ECS虚拟机,虚拟机需要创建linux系统,规格选择2U4G。
      2. 测试虚拟机和源集群和目标集群保持连通性,执行命令curl http://{ip}:{port}可以测试结果。

        IP是源集群和目的集群访问地址,端口默认是9200,如果不是9200使用集群实际端口。

        如下示例仅适用于非安全集群。

        curl http://10.234.73.128:9200
        {
          "name" : "voc_es_cluster_new-ess-esn-1-1",
          "cluster_name" : "voc_es_cluster_new",
          "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA",
          "version" : {
            "number" : "6.5.4",
            "build_flavor" : "default",
            "build_type" : "tar",
            "build_hash" : "d2ef93d",
            "build_date" : "2018-12-17T21:17:40.758843Z",
            "build_snapshot" : false,
            "lucene_version" : "7.5.0",
            "minimum_wire_compatibility_version" : "5.6.0",
            "minimum_index_compatibility_version" : "5.0.0"
          },
          "Tagline" : "You Know, for Search"
        }
  • 准备工具和软件

    在线安装和离线安装以虚拟机是否可以联网判断。如果可以联网直接使用yum和pip安装即可。离线安装需要下载安装包到虚拟机上执行安装命令。

    在线安装步骤如下:

    1. 执行yum install python2安装python2。
      [root@ecs opt]# yum install python2
    2. 执行yum install python-pip安装pip。
      [root@ecs opt]# yum install python-pip
    3. 执行pip install pyyaml安装yaml依赖。
    4. 执行pip install requests安装requests依赖。

    离线安装步骤如下:

    1. 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
      图1 下载python2安装包
    2. 使用winscp工具上传python安装包到opt目录下,安装python。
      # 解压python压缩包
      [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz
      Python-2.7.18/Modules/zlib/crc32.c
      Python-2.7.18/Modules/zlib/gzlib.c
      Python-2.7.18/Modules/zlib/inffast.c
      Python-2.7.18/Modules/zlib/example.c
      Python-2.7.18/Modules/python.c
      Python-2.7.18/Modules/nismodule.c
      Python-2.7.18/Modules/Setup.config.in
      …
      # 解压完成进入目录
      [root@ecs-52bc opt]# cd Python-2.7.18
      # 检查文件配置安装路径
      [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2
      …
      checking for build directories... checking for --with-computed-gotos... no value specified
      checking whether gcc -pthread supports computed gotos... yes
      done
      checking for ensurepip... no
      configure: creating ./config.status
      config.status: creating Makefile.pre
      config.status: creating Modules/Setup.config
      config.status: creating Misc/python.pc
      config.status: creating Modules/ld_so_aix
      config.status: creating pyconfig.h
      creating Modules/Setup
      creating Modules/Setup.local
      creating Makefile
      # 编译python
      [root@ecs-52bc Python-2.7.18]# make
      # 安装python
      [root@ecs-52bc Python-2.7.18]# make install
    3. 安装完成检查,检查python安装结果。
      # 检查python版本
      [root@ecs-52bc Python-2.7.18]# python --version
      Python 2.7.5
      # 检查pip版本
      [root@ecs-52bc Python-2.7.18]# pip --version
      pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7)
      [root@ecs-52bc Python-2.7.18]#
  • 准备执行脚本
    1. vi migrateConfig.yaml增加配置文件。
      es_cluster_new:
      # 集群名称
        clustername: es_cluster_new
        # 源端ES集群地址,加上http://,
        src_ip: http://x.x.x.x:9200
        # 没有用户名密码设置为""
        src_username: ""
        src_password: ""
        # 目的端ES集群地址,加上http://
        dest_ip: http://x.x.x.x:9200
        # 没有用户名密码设置为""
        dest_username: ""
        dest_password: ""
        # 可有不定义,默认为false, migrateMapping.py使用
        # 是否只处理这个文件中mapping地址的索引
        # 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
        # 如果设置成false,则会取源端集群的所有索引,除去(.kibana, .*)
        # 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
        # 如果匹配不到,则使用源端原始的索引名称
        only_mapping: false
        # 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
        mapping:
            test_index_1: test_index_1
      
        # 可以不定义,默认false, checkIndices.py使用
        # 设置为false会比较所有的索引和文档数量,设置为true只比较索引数量,
        only_compare_index: false

      配置文件说明:

      配置

      说明

      clustername

      集群名称,集群标识。

      src_ip

      源集群访问的ip地址,只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。

      src_username

      源集群访问的用户名,如果不需要设置为""。

      src_password

      源集群访问的密码,如果不需要设置为""。

      dest_ip

      目标集群访问的ip地址, 只需要集群一个地址,端口默认是9200,如果集群的访问的端口不是9200,以实际的配置端口为准。

      dest_username

      目标集群访问的用户名,如果不需要设置为""。

      dest_password

      目标集群访问的密码,如果不需要设置为""。

    2. 执行vi checkIndices.py命令,复制以下脚本到文件生成索引数据对比脚本。
      # -*- coding:UTF-8 -*-
      import sys
      import yaml
      import requests
      import re
      import json
      import os
      
      
      def printDividingLine():
          print("<=============================================================>")
      
      
      def get_cluster_version(url, auth=None):
          response = requests.get(url, auth=auth)
          if response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              return False
          cluster = response.json()
          version = cluster["version"]["number"]
          return True
      
      
      # get all indices
      def get_indices(url, source_auth):
          response = requests.get(url + "/_alias", auth=source_auth)
          if response.status_code != 200:
              print("*** get all index failed. resp statusCode:" + str(
                  response.status_code) + " response is " + response.text)
              exit()
          all_index = response.json()
          system_index = []
          create_index = []
          for index in list(all_index.keys()):
              if (index.startswith(".")):
                  system_index.append(index)
              else:
                  create_index.append(index)
          return create_index
      
      
      def get_mapping(url, _auth, index):
          source_url = url + "/" + index
          index_response = requests.get(source_url, auth=_auth)
          if index_response.status_code != 200:
              print("*** get ElasticSearch message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return "[failure] --- index is not exist in destination es. ---"
          mapping = index_response.json()
          return mapping
      
      
      def get_index_total(url, index, es_auth):
          stats_url = url + "/" + index + "/_stats"
          index_response = requests.get(stats_url, auth=es_auth, verify=False)
          if index_response.status_code != 200:
              print("*** get ElasticSearch stats message failed. resp statusCode:" + str(
                  index_response.status_code) + " response is " + index_response.text)
              return 0
          return index_response.json()
      
      
      def get_indices_stats(url, es_auth):
          endpoint = url + "/_cat/indices"
          indicesResult = requests.get(endpoint, es_auth)
          indicesList = indicesResult.split("\n")
          indexList = []
          for indices in indicesList:
              indexList.append(indices.split()[2])
          return indexList
      
      
      def loadConfig(argv):
          if argv is None or len(argv) != 2:
              config_yaml = "migrateConfig.yaml"
          else:
              config_yaml = argv[1]
          config_file = open(config_yaml)
          # python3
          # return yaml.load(config_file, Loader=yaml.FullLoader)
          return yaml.load(config_file)
      
      
      def main(argv):
          requests.packages.urllib3.disable_warnings()
          print("begin to migrate index mapping!")
          config = loadConfig(argv)
          src_clusters = config.keys()
      
          print("begin to process cluster name :")
          for name in src_clusters:
              print(name)
          print("cluster count:" + str(src_clusters.__len__()))
      
          for name, value in config.items():
              printDividingLine()
              source = value["src_ip"]
              source_user = value["src_username"]
              source_passwd = value["src_password"]
              source_auth = None
              if source_user != "":
                  source_auth = (source_user, source_passwd)
              dest = value["dest_ip"]
              dest_user = value["dest_username"]
              dest_passwd = value["dest_password"]
              dest_auth = None
              if dest_user != "":
                  dest_auth = (dest_user, dest_passwd)
              cluster_name = name
              if "clustername" in value:
                  cluster_name = value["clustername"]
      
              print("start to process cluster :" + cluster_name)
              # get all indices
              all_source_index = get_indices(source, source_auth)
              all_dest_index = get_indices(dest, dest_auth)
      
              if not os.path.exists("mappingLogs"):
                  os.makedirs("mappingLogs")
              filename = "mappingLogs/" + str(cluster_name) + "#indices_stats"
              with open(filename + ".json", "w") as f:
                  json.dump("cluster name: " + cluster_name, f)
                  f.write("\n")
                  json.dump("source indices: ", f)
                  f.write("\n")
                  json.dump(all_source_index, f, indent=4)
                  f.write("\n")
                  json.dump("destination indices : ", f)
                  f.write("\n")
                  json.dump(all_dest_index, f, indent=4)
                  f.write("\n")
      
              print("source indices total     : " + str(all_source_index.__len__()))
              print("destination index total  : " + str(all_dest_index.__len__()))
      
              filename_src = "mappingLogs/" + str(cluster_name) + "#indices_source_mapping"
              filename_dest = "mappingLogs/" + str(cluster_name) + "#indices_dest_mapping"
              with open(filename_src + ".json", "a") as f_src:
                  json.dump("cluster name: " + cluster_name, f_src)
                  f_src.write("\n")
              with open(filename_dest + ".json", "a") as f_dest:
                  json.dump("cluster name: " + cluster_name, f_dest)
                  f_dest.write("\n")
              for index in all_source_index:
                  mapping = get_mapping(source, source_auth, index)
                  with open(filename + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
                  with open(filename_src + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
                  mapping = get_mapping(dest, dest_auth, index)
                  with open(filename + ".json", "a") as f_dest:
                      json.dump("========================", f_dest)
                      f_dest.write("\n")
                      json.dump(mapping, f_dest, indent=4)
                      f_dest.write("\n")
                  with open(filename_dest + ".json", "a") as f_src:
                      json.dump("========================", f_src)
                      f_src.write("\n")
                      json.dump(mapping, f_src, indent=4)
                      f_src.write("\n")
      
              print("source indices write file success,      file: " + filename_src)
              print("destination indices write file success, file: " + filename_dest)
      
              if "only_compare_index" in value and value["only_compare_index"]:
                  print("[success] only compare mapping, not compare index count.")
                  continue
      
              for index in all_source_index:
                  index_total = get_index_total(value["src_ip"], index, source_auth)
                  src_total = index_total["_all"]["primaries"]["docs"]["count"]
                  src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024
                  dest_index = get_index_total(value["dest_ip"], index, dest_auth)
                  if dest_index is 0:
                      print('[failure] not found, index: %-20s, source total: %-10s size %6sM'
                            % (str(index), str(src_total), src_size))
                      continue
                  dest_total = dest_index["_all"]["primaries"]["docs"]["count"]
                  if src_total != dest_total:
                      print('[failure] not consistent, '
                            'index: %-20s, source total: %-10s size %6sM destination total: %-10s '
                            % (str(index), str(src_total), src_size, str(dest_total)))
                      continue
                  print('[success] compare index total equal : index : %-20s,  total: %-20s '
                        % (str(index), str(dest_total)))
      
      
      if __name__ == '__main__':
          main(sys.argv)

步骤一:创建Logstash集群

  • 迁移数据使用Logstash,创建logstash服务需要费用,默认是按需收费,用户迁移完毕数据及时释放Logstash节省费用。
  • 可以基于集群的索引不同创建多个Logstash集群分别配置不同的迁移任务。
  1. 登录云搜索服务管理控制台
  2. “总览”或者“集群管理”页面,选择“Logstash”,进入Logsash类型集群管理页面。
  3. 单击创建集群,进入“创建集群”页面。
  4. 选择“当前区域“可用区”
  5. 指定集群基本信息,选择“集群类型”“集群版本”,并输入“集群名称”
    表1 基本参数说明

    参数

    说明

    集群类型

    选择“Logstash”

    集群版本

    当前支持5.6.16、7.10.0。

    对应ES集群是5.x, 6.x 选择logstash版本5.6.16, 对应ES版本是7.X 选择logstash版本7.10.0。

    集群名称

    自定义的集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。

    图2 基本信息配置
  6. 指定集群的主机规格相关参数。“节点数量”设置为“1”“节点规格”选择“8U16G”,其余参数保持默认值。
    图3 设置主机规格
  7. 设置集群的企业项目,保持默认值即可。
  8. 单击“下一步,网络配置”,设置集群的网络配置。
    表2 参数说明

    参数

    说明

    虚拟私有云

    VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

    选择创建集群需要的VPC,单击“查看虚拟私有云”进入VPC服务查看已创建的VPC名称和ID。如果没有VPC,需要创建一个新的VPC。

    说明:

    此处您选择的VPC必须包含网段(CIDR),否则集群将无法创建成功。新建的VPC默认包含网段(CIDR)。

    子网

    通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。

    选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。

    安全组

    安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。单击“查看安全组”可了解安全组详情。

    说明:

    请确保安全组的“端口范围/ICMP类型”“Any”或者包含端口9200的端口范围。

    图4 设置网络规格
  9. 单击“下一步:高级配置”,高级配置可选择默认配置和自定义。此样例保持默认配置即可。
  10. 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
  11. 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为可用

步骤二:验证集群连通性

验证Logstash和源集群、目标集群的连通性。

  1. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,进入配置中心页面;或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  2. 在配置中心页面,选择“连通性测试”
  3. 输入源集群和目的集群的IP地址或域名和端口号,单击“测试”
    图5 连通性测试

步骤三:配置Logstash增量迁移任务

  1. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  2. 单击右上角“创建”,进入创建配置文件页面,选择集群模板,修改ES集群迁移配置文件。
    当前案例选用的两个ES集群均未开启https。
    • 选择集群模板:此样例是从Elasticsearch类型集群导入数据到Elasticsearch类型,集群模板选择“elasticsearch”,单击操作列的“应用”。基于不同的集群配置增加配置信息。
    • 修改配置文件:“名称”填写配置名称,例如es-es-inc;“配置文件内容”填写ES集群迁移配置文件,配置文件示例如下。
      input{
           elasticsearch{
               hosts =>  ["xx.xx.xx.xx:9200"]
               user => "css_logstash"
               password => "******"
               index => "*_202102"
               query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}'
               docinfo => true
               size => 1000
               #scroll => "5m"
      
           }
       }
      
       filter {
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
      
       output{
           elasticsearch{
             hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"]
               user => "admin"
               password => "******"
               index => "%{[@metadata][_index]}"
               document_type => "%{[@metadata][_type]}"
               document_id => "%{[@metadata][_id]}"
           }
      
           #stdout { codec => rubydebug { metadata => true }}
       }

      需要修改的配置如下:

      表3 集群配置修改说明

      配置

      说明

      hosts

      分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。

      user

      集群访问的用户名,如果没有用户名此项使用#注释掉。

      password

      集群访问的密码,如果没有用户名密码此项使用#注释掉。

      index

      需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。

      query

      增量数据的识别标识,一般是es的dls语句,需要提前分析。其中postsDate为业务中时间字段。

      {"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}

      此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。

      scroll

      当源端数据量过大,为了防止logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。

      不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。

步骤四:增量迁移

  1. 使用putty登录准备工作中创建的Linux虚拟机。
  2. 在Logstash集群管理页面,单击步骤一:创建Logstash集群中创建的集群名称,进入集群的基本信息页面。选择“配置中心”,或者直接单击目标集群操作列的“配置中心”,进入配置中心页面。
  3. 选择步骤三:配置Logstash增量迁移任务中所创建的配置文件,单击左上角的“启动”
  4. 根据界面提示,选择“是”启动Logstash服务会立刻开始迁移数据。
  5. 此时可以在管道下面看到启动的配置文件。
  6. 数据迁移完毕检查数据一致性,使用putty登录linux虚拟机,执行python checkIndices.py 对比数据结果。

步骤五:释放Logstash集群

集群迁移完毕,释放Logstash集群。

  1. 登录云搜索服务管理控制台。
  2. 单击“集群管理”>“Logstash”,进入Logstash类型的集群列表界面,在所创建的迁移集群“操作”列中单击更多>删除
  3. 在弹出的确认提示框中,再次输入需要删除的集群名称,单击“确定”完成集群删除。