如何合并小文件
什么是小文件
分布式文件系统按块(Block)存放数据。小文件是指在存储系统中,文件大小远小于存储块大小的文件。大量小文件会给大数据系统带来显著的性能和管理问题,合并小文件是优化系统性能的关键手段之一。
本文介绍如何在DLI中使用DISTRIBUTE BY来合并小文件。
小文件的主要危害
- 元数据压力:文件系统(如HDFS)的Master节点(NameNode)需要在内存中管理每个文件的元数据。海量小文件会急剧增加其内存消耗,成为集群扩展的瓶颈。
- 计算性能低下:计算引擎(如Spark)通常为每个文件或文件块启动一个计算任务(Task)。处理大量小文件会导致:
- 巨大的任务调度开销:启动和调度成千上万个Task的时间可能远超过实际数据处理时间。
- 资源浪费:大量CPU和内存资源被消耗在任务管理上,而非有效计算。
- 查询延迟:作业整体运行时间变长,查询速度变慢。
- 存储效率低:小文件可能导致存储空间利用率低下,无法充分利用分布式存储的块大小优势。
- 易触达限制:可能触发计算引擎或文件系统对单个目录下文件数量或单次任务Task数量的上限,导致作业失败。
小文件合并基本原理
- 基本原理
在Spark或Hive等基于MapReduce的引擎中,一个任务最终会产生多少个文件,取决于最后一个阶段(通常是 Reduce 阶段)的任务数量。每个 Reduce 任务会默认产生一个文件。
通过DISTRIBUTE BY将数据分配到指定数量的Reduce任务中,每个任务生成一个文件,从而实现文件合并。
- 数据重分布:它会根据后面跟着的表达式(如 floor(rand()*N))的哈希值,将数据发送到不同的Reduce任务。
- 控制 Reduce 数量:通过设置N的值,可以间接地、概率性地控制Reduce任务的数量,从而控制最终输出的文件数量。N通常就是你期望合并后每个分区下的文件数量。
非分区表合并示例
- 使用临时表中转(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
-- 1. 创建结构一致的临时表 CREATE TABLE temp_tablename LIKE tablename; -- 2. 合并数据写入临时表 INSERT OVERWRITE TABLE temp_tablename SELECT * FROM tablename DISTRIBUTE BY floor(rand()*20); -- 生成约20个文件 -- 3. 数据验证 SELECT count(*) FROM tablename; -- 原表 SELECT count(*) FROM temp_tablename; -- 临时表 -- 4.写回原表 INSERT OVERWRITE TABLE tablename SELECT * FROM temp_tablename; -- 5. 可选:删除旧表 DROP TABLE table_name_old;
- 自读自写
INSERT OVERWRITE TABLE tablename SELECT * FROM tablename DISTRIBUTE BY floor(rand() * 20)
- INSERT OVERWRITE TABLE tablename
覆盖写入,将查询结果覆盖写入目标表(tablename)。
此操作会删除原表数据,建议使用临时表替代以避免数据丢失。
- SELECT * FROM tablename
从原表(tablename)读取所有数据。
- DISTRIBUTE BY floor(rand() * 20)
- 随机分发数据:
- rand() 生成 [0, 1) 范围的随机浮点数。
- rand() * 20 生成 [0, 20) 范围的随机浮点数。
- floor(...) 将结果向下取整,得到 [0, 19] 范围的整数。
- 分发效果:
- 数据会随机且均匀地分配到最多 20 个 Reduce 任务中。
- 最终生成最多 20 个输出文件(每个 Reduce 任务对应一个文件)。
- 随机分发数据:
- INSERT OVERWRITE TABLE tablename
分区表合并
- 使用临时表(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
-- 1. 创建结构一致的临时表 CREATE TABLE table_tmp LIKE table_name; -- 2. 分区合并写入临时表 INSERT OVERWRITE TABLE table_tmp PARTITION(pt) SELECT col1, col2, ..., pt FROM tablename WHERE pt = * -- 分区条件 DISTRIBUTE BY pt, floor(rand() * N); -- N为每分区目标文件数 -- 3. 数据验证 SELECT COUNT(*) FROM tablename; -- 原表 SELECT COUNT(*) FROM temp_tablename; -- 临时表 -- 4. 写回原表 INSERT OVERWRITE TABLE tablename SELECT * FROM temp_tablename; -- 5. 可选:删除旧表 DROP TABLE temp_tablename;
- 自读自写
1 2 3 4 5 6 7 8 9 10 11
INSERT OVERWRITE TABLE target_table PARTITION (pt) SELECT col1, col2, ..., pt -- 分区字段必须在SELECT的最后 FROM target_table WHERE pt = * DISTRIBUTE BY pt, floor(rand() * N)
示例:学生分区表合并
- 场景说明
假设有一个分区表student,分区字段是 facultyNo (学院编号),classNo(班级编号)。现在希望将每个分区下的小文件合并,每个分区最多保留2个文件。
- 表名:student
- 分区字段:facultyNo(学院编号), classNo(班级编号)
- 目标:每个分区合并为2个文件
- 示例代码
-- 1. 创建临时表 CREATE TABLE student_tmp LIKE student; -- 2. 分区合并写入 INSERT OVERWRITE TABLE student_tmp PARTITION(facultyNo, classNo) SELECT id, name, gender, age, birth_date, phone, email, address, enrollment_date, major, grade, status, facultyNo, classNo FROM student WHERE facultyNo = 1 AND classNo = 7 -- 分区条件 DISTRIBUTE BY facultyNo, classNo, floor(rand() * 2); -- 控制文件数量执行后:分区 facultyNo = 1, classNo = 7 分区下的数据会被合并到最多2个文件中。-- 3. 分区验证(示例:学院1-班级7) SELECT count(*) FROM student WHERE facultyNo=1 AND classNo=7; --> 原数据量 SELECT count(*) FROM student_tmp WHERE facultyNo=1 AND classNo=7; --> 合并后数据量 -- 4. 执行INSERT OVERWRITE 将数据写回原表 INSERT OVERWRITE TABLE tablename SELECT * FROM student_tmp;
更多操作建议
直接 OVERWRITE 原表时,如果任务在写入过程中失败,可能会导致数据丢失。
推荐使用临时表中转:
- 创建临时表(结构与原表一致)。
- 将合并后的数据插入临时表。
- 验证临时表数据无误后,再使用 ALTER TABLE ... RENAME TO 或 INSERT OVERWRITE 将临时表切换为正式表。
DISTRIBUTE BY 字段的选择:除了使用随机数,如果表有天然的高基数字段(如 user_id),也可以使用它来进行分发,这样在合并文件的同时还能让数据按该字段排序,有时能提升后续查询性能。
-- 1. 创建临时表(结构与原表table_name相同) CREATE TABLE table_name_tmp LIKE table_name; -- 2. 将原表数据合并后插入临时表 INSERT OVERWRITE TABLE table_name_tmp PARTITION (pt) -- 如果是分区表 SELECT ... -- 所选字段 FROM table_name WHERE pt = * DISTRIBUTE BY pt, floor(rand() * N); -- 分区表写法 -- 对于非分区表:DISTRIBUTE BY floor(rand() * N) -- 3. 验证数据条数、分区等是否正确 SELECT count(*) FROM table_name; SELECT count(*) FROM table_name_tmp; -- 4. 确认无误后,通过重命名交换表(此操作是元数据操作,瞬间完成) ALTER TABLE table_name RENAME TO table_name_old; ALTER TABLE table_name_tmp RENAME TO table_name; -- 5. (可选)删除旧的表 DROP TABLE table_name_old;
小文件合并方法总结
|
场景 |
推荐 SQL 写法 |
关键点 |
|---|---|---|
|
非分区表 |
INSERT OVERWRITE TABLE table_tmp SELECT * FROM table DISTRIBUTE BY floor(rand()*N); |
使用随机数控制文件数量 N |
|
分区表 |
INSERT OVERWRITE TABLE table_tmp PARTITION(pt) SELECT ..., pt FROM table WHERE pt = * DISTRIBUTE BY pt, floor(rand()*N); |
必须在 DISTRIBUTE BY 中包含分区字段,以确保合并是在分区内进行 |
|
通用策略 |
使用临时表中转,验证数据后再通过重命名交换表,避免数据丢失风险。 |
避免数据丢失 |