更新时间:2024-07-24 GMT+08:00

使用Spark小文件合并工具说明

工具介绍

在Hadoop大规模生产集群中,由于HDFS的元数据都保存在NameNode的内存中,集群规模受制于NameNode单点的内存限制。如果HDFS中有大量的小文件,会消耗NameNode大量内存,还会大幅降低读写性能,延长作业运行时间。因此,小文件问题是制约Hadoop集群规模扩展的关键问题。

本工具主要有如下两个功能:

  1. 扫描表中有多少低于用户设定阈值的小文件,返回该表目录中所有数据文件的平均大小。
  2. 对表文件提供合并功能,用户可设置合并后的平均文件大小。

支持的表类型

Spark:Parquet、ORC、CSV、Text、Json。

Hive:Parquet、ORC、CSV、Text、RCFile、Sequence、Bucket。

  1. 数据有压缩的表在执行合并后会采用Spark默认的压缩格式-Snappy。可以通过在客户端设置“spark.sql.parquet.compression.codec”(可选:uncompressed, gzip, snappy)和“spark.sql.orc.compression.codec”(可选:uncompressed, zlib, lzo, snappy)来选择Parquet和Orc表的压缩格式;由于Hive和Spark表在可选的压缩格式上有区别,除以上列出的压缩格式外,其他的压缩格式不支持。
  2. 合并桶表数据,需要先在Spark2x客户端的hive-site.xml里加上配置:
    <property>
    <name>hive.enforce.bucketing</name>
    <value>false</value>
    </property>
    <property>
    <name>hive.enforce.sorting</name>
    <value>false</value>
    </property>
  3. Spark暂不支持Hive的加密列特性。

工具使用

下载安装客户端,例如安装目录为“/opt/client”。进入“/opt/client/Spark2x/spark/bin”,执行mergetool.sh脚本。

加载环境变量

source /opt/client/bigdata_env

source /opt/client/Spark2x/component_env

扫描功能

命令形式: sh mergetool.sh scan <db.table> <filesize>

db.table的形式是“数据库名.表名”,filesize为用户自定义的小文件阈值(单位MB),返回结果为小于该阈值的文件个数,及整个表目录数据文件的平均大小。

例如:sh mergetool.sh scan default.table1 128

合并功能

命令形式: sh mergetool.sh merge <db.table> <filesize> <shuffle>

db.table的形式是“数据库名.表名”,filesize为用户自定义的合并后平均文件大小(单位MB),shuffle是一个boolean值,取值true/false,作用是设置合并过程中是否允许数据进行shuffle。

例如:sh mergetool.sh merge default.table1 128 false

提示如下,则操作成功:

SUCCESS: Merge succeeded
  1. 请确保当前用户对合并的表具有owner权限。
  2. 合并前请确保HDFS上有足够的存储空间,至少需要被合并表大小的一倍以上。
  3. 合并表数据的操作需要单独进行,在此过程中读表,可能临时出现找不到文件的问题,合并完成后会恢复正常;另外在合并过程中请注意不要对相应的表进行写操作,否则可能会产生数据一致性问题。
  4. 如果合并完成后,在一直处于连接状态的spark-beeline/spark-sql session中查询分区表的数据,出现文件不存在的问题,根据提示可以执行"refresh table 表名"后再重新查询。
  5. 请依据实际情况合理设置filesize值,例如可以在scan得到表中平均文件大小值average后,在merge时将filesize设置一个比average更大的值;否则,执行合并后可能出现文件数变得更多的情况。
  6. 合并过程中,会将原表数据放入回收站,再填入已合并的数据。如果在此过程中发生异常,根据工具提示,可将trash目录中的数据通过hdfs的mv命令恢复。
  7. 在HDFS router联邦场景下,如果表的根路径与根路径“/user”的目标NameService不同,在二次合并时需要手动清理放入回收站的原表文件,否则会导致合并失败。
  8. 此工具应用客户端配置,需要做性能调优可修改客户端配置文件的相关配置。

shuffle设置

对于合并功能,可粗略估计合并前后分区数的变化:

一般来说,旧分区数>新分区数,可设置shuffle为false;但如果旧分区远大于新分区数,例如高于100倍以上,可以考虑设置shuffle为true,增加并行度,提高合并的速度。

  • 设置shuffle为true(repartition),会有性能上的提升;但是由于Parquet和Orc存储方式的特殊性,repartition会使压缩率变小,直接表现是hdfs上表的总大小会增大到1.3倍。
  • 设置shuffle为false(coalesce),合并后的大小不会非常平均,可能会分布在设置的filesize左右。

日志存放位置

默认日志存放位置为/tmp/SmallFilesLog.log4j,如需自定义日志存放位置,可在/opt/client/Spark2x/spark/tool/log4j.properties中配置log4j.appender.logfile.File。