使用Spark小文件合并工具说明
工具介绍
在Hadoop大规模生产集群中,由于HDFS的元数据都保存在NameNode的内存中,集群规模受制于NameNode单点的内存限制。如果HDFS中有大量的小文件,会消耗NameNode大量内存,还会大幅降低读写性能,延长作业运行时间。因此,小文件问题是制约Hadoop集群规模扩展的关键问题。
本工具主要有如下两个功能:
- 扫描表中有多少低于用户设定阈值的小文件,返回该表目录中所有数据文件的平均大小。
- 对表文件提供合并功能,用户可设置合并后的平均文件大小。
支持的表类型
Spark:Parquet、ORC、CSV、Text、Json。
Hive:Parquet、ORC、CSV、Text、RCFile、Sequence、Bucket。
- 数据有压缩的表在执行合并后会采用Spark默认的压缩格式-Snappy。可以通过在客户端设置“spark.sql.parquet.compression.codec”(可选:uncompressed, gzip, snappy)和"spark.sql.orc.compression.codec"(可选:uncompressed, zlib, lzo, snappy)来选择Parquet和Orc表的压缩格式;由于Hive和Spark表在可选的压缩格式上有区别,除以上列出的压缩格式外,其他的压缩格式不支持。
- 合并桶表数据,需要先在Spark2x客户端的hive-site.xml里加上配置:
<property> <name>hive.enforce.bucketing</name> <value>false</value> </property> <property> <name>hive.enforce.sorting</name> <value>false</value> </property>
- Spark暂不支持Hive的加密列特性。
工具使用
下载安装客户端,例如安装目录为“/opt/client”。进入“/opt/client/Spark2x/spark/bin”,执行mergetool.sh脚本。
加载环境变量
source /opt/client/bigdata_env
source /opt/client/Spark2x/component_env
扫描功能
命令形式: sh mergetool.sh scan <db.table> <filesize>
db.table的形式是“数据库名.表名”,filesize为用户自定义的小文件阈值(单位MB),返回结果为小于该阈值的文件个数,及整个表目录数据文件的平均大小。
例如:sh mergetool.sh scan default.table1 128
合并功能
命令形式: sh mergetool.sh merge <db.table> <filesize> <shuffle>
db.table的形式是“数据库名.表名”,filesize为用户自定义的合并后平均文件大小(单位MB),shuffle是一个boolean值,取值true/false,作用是设置合并过程中是否允许数据进行shuffle。
例如:sh mergetool.sh merge default.table1 128 false
提示如下,则操作成功:
SUCCESS: Merge succeeded
- 请确保当前用户对合并的表具有owner权限。
- 合并前请确保HDFS上有足够的存储空间,至少需要被合并表大小的一倍以上。
- 合并表数据的操作需要单独进行,在此过程中读表,可能临时出现找不到文件的问题,合并完成后会恢复正常;另外在合并过程中请注意不要对相应的表进行写操作,否则可能会产生数据一致性问题。
- 若合并完成后,在一直处于连接状态的spark-beeline/spark-sql session中查询分区表的数据,出现文件不存在的问题,根据提示可以执行"refresh table 表名"后再重新查询。
- 请依据实际情况合理设置filesize值,例如可以在scan得到表中平均文件大小值average后,在merge时将filesize设置一个比average更大的值;否则,执行合并后可能出现文件数变得更多的情况。
- 合并过程中,会将原表数据放入回收站,再填入已合并的数据。若在此过程中发生异常,根据工具提示,可将trash目录中的数据通过hdfs的mv命令恢复。
- 在HDFS router联邦场景下,如果表的根路径与根路径“/user”的目标NameService不同,在二次合并时需要手动清理放入回收站的原表文件,否则会导致合并失败。
- 此工具应用客户端配置,需要做性能调优可修改客户端配置文件的相关配置。
shuffle设置
对于合并功能,可粗略估计合并前后分区数的变化:
一般来说,旧分区数>新分区数,可设置shuffle为false;但如果旧分区远大于新分区数,例如高于100倍以上,可以考虑设置shuffle为true,增加并行度,提高合并的速度。
- 设置shuffle为true(repartition),会有性能上的提升;但是由于Parquet和Orc存储方式的特殊性,repartition会使压缩率变小,直接表现是hdfs上表的总大小会增大到1.3倍。
- 设置shuffle为false(coalesce),合并后的大小不会非常平均,可能会分布在设置的filesize左右。
日志存放位置
默认日志存放位置为/tmp/SmallFilesLog.log4j,如需自定义日志存放位置,可在/opt/client/Spark2x/spark/tool/log4j.properties中配置log4j.appender.logfile.File。