更新时间:2024-11-29 GMT+08:00

使用HDFS2ES工具迁移HDFS数据

操作场景

该操作指导工程师在Elasticsearch正常运行时,使用HDFS2ES工具以运行Mapreduce方式将HDFS中格式化的数据导入到Elasticsearch中。

前提条件

  • 迁移数据的集群正常运行。
  • 迁移数据的集群可以正常运行Mapreduce,并与Elasticsearch集群网络互通。
  • 已准备HDFS数据。
    HDFS上的数据及目录格式要求如下:
    • HDFS目录结构为:固定Base目录/时间/数据文件(或目录)。工具默认该时间为数据时间,每次只读取新写到HDFS目录下的数据。

      例如:/user/FusionInsight_Elasticsearch/Hdfs2Es/input/testall/20190101,表示该目录下的数据,均为20190101当天的数据。

    • 导入数据格式:按行写入的数据文件,字段间以指定的自定义分隔符分隔。

      例如:

      1    张三    男    18 
      • 示例使用水平制表符作为分隔符,各字段间使用的分割符,可通过配置文件“table*.xml”中的“FieldSplit”属性指定。
      • “table*.xml”中的“FieldSplit”属性值只支持配置char类型字符,以水平制表符为例可配置为转义符“\t”,也可配置为Unicode“\u0009”。
    • 数据存储方式满足如下任一方式:
      1. 全量写入:

        每次写入时,认为获取的最新输入数据为全量数据,在写入Elasticsearch后,会将历史数据删除,只保留最新的这份数据。

      2. 增量写入,不分表:

        数据以追加的方式写入同一个索引。

      3. 增量写入,需要分表:

        按数据目录的时间,定期创建索引,将相同时间段的数据写入同一个索引。

        数据在写入时,认为需要写入Elasticsearch的HDFS数据目录时间为数据时间,根据配置中的分表类型,设定对应表名。

        表名规则为:${TABLE_NAME}_${DATE_TYPE}。${TABLE_NAME}为指定的表名,${DATE_TYPE}为分表时间(按位数不同做区分)。例如:

        • 表名为${TABLE_NAME}_2019表示按年分表(2019年)
        • 表名为${TABLE_NAME}_20191表示按季度分表(2019年第一季度)
        • 表名为${TABLE_NAME}_201901表示按月分表(2019年1月份)
        • 表名为${TABLE_NAME}_2019001表示按周分表(2019年第一周)
        • 表名为${TABLE_NAME}_20190102表示按天分表(2019年1月2日)
        • 为方便理解,上述描述中,以“表”指代Elasticsearch中的索引。
        • 该工具支持以上任一种或多种存储方式,请根据实际使用场景,合理配置tableConf目录下的xml文件。
  • 已安装集群客户端,例如安装目录为“/opt/client”。并且已执行source /opt/client/bigdata_env命令。

操作步骤

修改配置文件

  1. 替换xml配置文件。

    切换到Yarn客户端安装目录,将Yarn客户端中config目录下的“core-site.xml”“hdfs-site.xml”“mapred-site.xml”“yarn-site.xml”文件拷贝到集群客户端的Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf目录下。

    xml文件是访问HDFS/Mapreduce必用的工具配置文件。

  2. 在Manager界面选择“集群 > 待操作集群的名称 > 集群属性”,确认集群的认证模式是否为安全模式。

    • 是,执行3
    • 否,执行8

  3. 创建用户。

    1. 在Manager界面选择系统 > 权限 > 用户 > 添加用户
    2. 填写用户名,例如test。选择用户类型为“人机”。将test加入用户组“elasticsearch”和“supergroup”,选择“主组”为“supergroup”,并绑定角色“Manager_administrator”取得权限。基于Ranger鉴权模式下,在Ranger中为test用户添加Elasticsearch访问权限策略,具体操作参考添加Elasticsearch的Ranger访问权限策略
    3. 单击“确定”

  4. 在Manager中选择“系统 > 权限 > 用户”,勾选对应的用户名,选择“更多 > 下载认证凭据”,选择集群信息后单击“确定”,下载文件。
  5. 将解压压缩包后得到的“user.keytab”“krb5.conf”文件上传到集群客户端的“Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf”目录下。
  6. root用户登录客户端所在节点。
  7. 执行以下命令修改HDFS2ES工具包basicConf目录下jaas.conf的“principal”的值:

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf

    vi jaas.conf

    如下所示,test是用户名,即修改为与esParams.properties的principal配置一致;keyTab需配置为客户端中“user.keytab”文件目录。
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/basicConf/user.keytab"
    principal="test@<系统域名>"
    useTicketCache=false
    storeKey=true
    debug=true;
    };

    用户可登录Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

  8. 执行以下命令修改hdfs2es/basicConf目录下esParams.properties参数配置文件:

    vi esParams.properties

    配置样例如下:

    #写入Elasticsearch的集群ip端口
    EsNodes=ip1:port1,ip2:port2
    #用户名
    principal=test
    #安全模式集群:true;普通模式集群:false
    securityMode=true
    #Elasticsearch安全模式:true;Elasticsearch普通模式:false(根据Manager中Elasticsearch配置“ELASTICSEARCH_SECURITY_ENABLE”参数确定,若集群为普通模式,该参数为false)
    esSecurityMode=true
    #线程池大小,即一次多少个表的数据开始写入Elasticsearch(请根据入库的数据量及索引个数决定,避免过多或过少)
    threadNum=10
    #批量写入条数,即一次bulk入多少条数据
    batch=1000
    #JOB一次处理文件的大小,单位MB;不填默认为0;当该值为0时,不做限制,一次处理输入目录下所有数据文件
    inputFileSize=

    参数“inputFileSize”用于控制一个job处理的数据量范围。例如,一个job存在三个需要处理的文件,大小分别为50MB、60MB、70MB,该参数设置为100MB,则会将该job拆分为两个依次处理,即先处理前两个文件,再处理第三个文件(第一个job输入大小为110MB,第二个job输入大小为70MB)。

  1. 根据配置模板,填写需要写入的Elasticsearch索引xml配置。

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es/tableConf

    cat tableAll.xml

    将填写的配置,放置tableConf目录下,配置的文件名不做限制,为“.xml”文件即可。完成后,删除所有配置模板的xml文件。
    • 多个配置即存在多个索引,每个索引运行一个Mapreduce任务;因此,请按照当前集群状态,合理设定esParams.properties配置中的threadNum值。防止一次性提交过多的任务,导致集群异常。
    • 如需要周期性运行,请根据实际的数据量及集群运行情况确定一个合理的扫描周期。如数据量较大,建议首次为一次性导入,完成后,再改为周期,防止历史数据过多,增加集群压力。
    • table*.xml文件中的StatusConfig属性配置的参数值,记录HDFS已导入Elasticsearch的最大日期,HDFS2ES执行完成后会自动更新HDFS中StatusConfig中子目录日期,HDFS2ES只会执行大于StatusConfig中子目录日期,如StatusConfig目录中存在20190102子目录,则只会导入日期大于20190102目录的数据;之前的目录如20190101、20181230不会导入,若想重新导入20190101目录数据,需将StatusConfig目录下的20190102子目录删除。
    • 写入失败的数据,会在对应的失败目录(配置中的hdfs目录)下,按时间写入,数据内容为可直接通过bulk命令批量写入的json格式。写入失败的数据,目前不会自动清理,如不需要永久保存,请及时清理。
    • 写入的统计结果,会在日志中打印,如需要查看,可在logs/hdfs2es.log中查看每份数据的写入情况。

  2. 在“hdfs2es/tableConf”目录执行以下命令,根据配置模板,填写建立索引的语句。

    vi createIndex.properties

    删除配置中的样例语句,填写根据实际场景确定的建立索引的语句,并保证xml中配置的索引,在该配置中也配置了相应的建立索引语句。

  3. 运行工具,扫描HDFS数据,写入Elasticsearch。

    • 方式一:周期性扫描,即工具按指定周期(单位:秒),不断扫描输入目录下,是否有新的数据需要写入。

      cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

      sh run.sh basicConf/ tableConf/ true 2000

    • 方式二:一次性扫描,即工具运行一次后结束。(若待扫描的目录日期小于StatusConfig目录日期,任务不会执行)

      cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

      sh run.sh basicConf/ tableConf/ false

  4. 通过curl命令查看索引确认数据是否导入,例如:

    • 安全模式:

      curl -XGET --tlsv1.2 --negotiate -k -u : "https://IP:PORT/my_store1/_search"

    • 普通模式:

      curl -XGET "http://IP:PORT/my_store1/_search"

    curl命令的使用请参考Linux下curl命令的使用

  5. 数据导入完成后,若有数据写入失败,可以在配置中指定的HDFS目录下查看对应数据。如需将失败数据尝试再次导入Elasticsearch集群,可通过如下命令进行二次导入:

    cd /opt/client/Elasticsearch/tools/elasticsearch-data2es/hdfs2es

    java -cp ./basicConf/:./../lib/* com.*.fusioninsight.es.tool.hdfs2es.reload.ReloadJob ${HDFS_DIRECTORY} ${BATCH_SIZE}

    • ${HDFS_DIRECTORY} 表示需要二次导入的数据目录,请保证该目录的下一级,均为需要处理的文件,而非目录。
    • ${BATCH_SIZE} 表示一次bulk的数据条数。
    • 集群客户端的Elasticsearch/tools/elasticsearch-data2es/hdfs2es/logs目录,可以查看导入过程的日志信息。
    • 如需使用二次导入功能,请保证basicConf目录下的配置,已按上述操作步骤执行。