更新时间:2026-01-13 GMT+08:00
分享

如何合并小文件

什么是小文件

分布式文件系统按块(Block)存放数据。小文件是指在存储系统中,文件大小远小于存储块大小的文件。大量小文件会给大数据系统带来显著的性能和管理问题,合并小文件是优化系统性能的关键手段之一。

本文介绍如何在DLI中使用DISTRIBUTE BY来合并小文件。

小文件的主要危害

  • 元数据压力:文件系统(如HDFS)的Master节点(NameNode)需要在内存中管理每个文件的元数据。海量小文件会急剧增加其内存消耗,成为集群扩展的瓶颈。
  • 计算性能低下:计算引擎(如Spark)通常为每个文件或文件块启动一个计算任务(Task)。处理大量小文件会导致:
    • 巨大的任务调度开销:启动和调度成千上万个Task的时间可能远超过实际数据处理时间。
    • 资源浪费:大量CPU和内存资源被消耗在任务管理上,而非有效计算。
    • 查询延迟:作业整体运行时间变长,查询速度变慢。
  • 存储效率低:小文件可能导致存储空间利用率低下,无法充分利用分布式存储的块大小优势。
  • 易触达限制:可能触发计算引擎或文件系统对单个目录下文件数量或单次任务Task数量的上限,导致作业失败。

小文件合并基本原理

  • 基本原理

    在Spark或Hive等基于MapReduce的引擎中,一个任务最终会产生多少个文件,取决于最后一个阶段(通常是 Reduce 阶段)的任务数量。每个 Reduce 任务会默认产生一个文件。

    通过DISTRIBUTE BY将数据分配到指定数量的Reduce任务中,每个任务生成一个文件,从而实现文件合并。

    • 数据重分布:它会根据后面跟着的表达式(如 floor(rand()*N))的哈希值,将数据发送到不同的Reduce任务。
    • 控制 Reduce 数量:通过设置N的值,可以间接地、概率性地控制Reduce任务的数量,从而控制最终输出的文件数量。N通常就是你期望合并后每个分区下的文件数量。

非分区表合并示例

  • 使用临时表中转(推荐)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    -- 1. 创建结构一致的临时表
    CREATE TABLE temp_tablename LIKE tablename;
    
    -- 2. 合并数据写入临时表
    INSERT OVERWRITE TABLE temp_tablename
    SELECT * FROM tablename
    DISTRIBUTE BY floor(rand()*20); -- 生成约20个文件
    
    -- 3. 数据验证
    SELECT count(*) FROM tablename;       -- 原表
    SELECT count(*) FROM temp_tablename;  -- 临时表
    
    -- 4.写回原表
    INSERT OVERWRITE TABLE tablename
    SELECT 
    *
    FROM temp_tablename;
    
    -- 5. 可选:删除旧表
    DROP TABLE table_name_old;
    
  • 自读自写
    INSERT OVERWRITE TABLE tablename
    SELECT *
    FROM tablename
    DISTRIBUTE BY floor(rand() * 20)
    • INSERT OVERWRITE TABLE tablename

      覆盖写入,将查询结果覆盖写入目标表(tablename)。

      此操作会删除原表数据,建议使用临时表替代以避免数据丢失。

    • SELECT * FROM tablename

      从原表(tablename)读取所有数据。

    • DISTRIBUTE BY floor(rand() * 20)
      • 随机分发数据:
        • rand() 生成 [0, 1) 范围的随机浮点数。
        • rand() * 20 生成 [0, 20) 范围的随机浮点数。
        • floor(...) 将结果向下取整,得到 [0, 19] 范围的整数。
      • 分发效果:
        • 数据会随机且均匀地分配到最多 20 个 Reduce 任务中。
        • 最终生成最多 20 个输出文件(每个 Reduce 任务对应一个文件)。

分区表合并

对于分区表,我们通常希望在每个分区内进行文件合并,而不是将所有分区的数据混在一起。
  • 参数说明:
    • N:你期望每个分区内合并后的文件数量。例如,如果希望每个分区最终只有5个文件,就设置为5。
    • pt:你的分区字段。
  • 为什么要在 DISTRIBUTE BY 中加上分区字段?
    • 这确保了同一个分区的数据一定会被发送到同一组 Reduce 任务中进行处理。
    • DISTRIBUTE BY pt, floor(rand() * N) 的语义是:首先按照分区字段 pt 进行分发,然后在每个 pt 内部,再按照随机数进行二次分发。

      每个分区(pt)内部的数据都会被重新打散到 N 个文件中,而不同分区的数据不会相互干扰。

  • 使用临时表(推荐)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    -- 1. 创建结构一致的临时表
    CREATE TABLE table_tmp LIKE table_name;
    
    -- 2. 分区合并写入临时表
    INSERT OVERWRITE TABLE table_tmp PARTITION(pt)
    SELECT col1, col2, ..., pt 
    FROM tablename
    WHERE pt = *  -- 分区条件
    DISTRIBUTE BY pt, floor(rand() * N); -- N为每分区目标文件数
    
    -- 3. 数据验证
    SELECT COUNT(*) FROM tablename;   -- 原表
    SELECT COUNT(*) FROM temp_tablename;    -- 临时表
    
    -- 4. 写回原表
    INSERT OVERWRITE TABLE tablename
    SELECT 
    *
    FROM temp_tablename;
    
    -- 5. 可选:删除旧表
    DROP TABLE temp_tablename;
    
  • 自读自写
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    INSERT OVERWRITE TABLE target_table 
    PARTITION (pt)
    SELECT      
        col1,
        col2, 
        ...,    
        pt -- 分区字段必须在SELECT的最后
    FROM
        target_table
    WHERE pt = * 
    DISTRIBUTE BY pt, floor(rand() * N) 
    

示例:学生分区表合并

  • 场景说明

    假设有一个分区表student,分区字段是 facultyNo (学院编号),classNo(班级编号)。现在希望将每个分区下的小文件合并,每个分区最多保留2个文件。

    - 表名:student

    - 分区字段:facultyNo(学院编号), classNo(班级编号)

    - 目标:每个分区合并为2个文件

  • 示例代码
    -- 1. 创建临时表
    CREATE TABLE student_tmp LIKE student;
    
    -- 2. 分区合并写入
    INSERT OVERWRITE TABLE student_tmp PARTITION(facultyNo, classNo)
    SELECT 
        id,
        name,
        gender,
        age,
        birth_date,
        phone,
        email,
        address,
        enrollment_date,
        major,
        grade,
        status,
        facultyNo,
        classNo
    FROM student
    WHERE facultyNo = 1 AND classNo = 7  -- 分区条件
    DISTRIBUTE BY facultyNo, classNo, floor(rand() * 2);  -- 控制文件数量
    
    执行后:分区 facultyNo = 1, classNo = 7 分区下的数据会被合并到最多2个文件中。
    -- 3. 分区验证(示例:学院1-班级7)
    SELECT count(*) FROM student 
    WHERE facultyNo=1 AND classNo=7;   --> 原数据量
    
    SELECT count(*) FROM student_tmp 
    WHERE facultyNo=1 AND classNo=7;   --> 合并后数据量
    
    -- 4. 执行INSERT OVERWRITE 将数据写回原表
    INSERT OVERWRITE TABLE tablename
    SELECT 
    *
    FROM student_tmp;

更多操作建议

直接 OVERWRITE 原表时,如果任务在写入过程中失败,可能会导致数据丢失。

推荐使用临时表中转

  1. 创建临时表(结构与原表一致)。
  2. 将合并后的数据插入临时表
  3. 验证临时表数据无误后,再使用 ALTER TABLE ... RENAME TO 或 INSERT OVERWRITE 将临时表切换为正式表。

DISTRIBUTE BY 字段的选择:除了使用随机数,如果表有天然的高基数字段(如 user_id),也可以使用它来进行分发,这样在合并文件的同时还能让数据按该字段排序,有时能提升后续查询性能。

-- 1. 创建临时表(结构与原表table_name相同)
CREATE TABLE table_name_tmp LIKE table_name;

-- 2. 将原表数据合并后插入临时表
INSERT OVERWRITE TABLE table_name_tmp PARTITION (pt) -- 如果是分区表
SELECT ... -- 所选字段
FROM table_name
WHERE pt = *
DISTRIBUTE BY pt, floor(rand() * N); -- 分区表写法
-- 对于非分区表:DISTRIBUTE BY floor(rand() * N)

-- 3. 验证数据条数、分区等是否正确
SELECT count(*) FROM table_name;
SELECT count(*) FROM table_name_tmp;

-- 4. 确认无误后,通过重命名交换表(此操作是元数据操作,瞬间完成)
ALTER TABLE table_name RENAME TO table_name_old;
ALTER TABLE table_name_tmp RENAME TO table_name;

-- 5. (可选)删除旧的表
DROP TABLE table_name_old;

小文件合并方法总结

场景

推荐 SQL 写法

关键点

非分区表

INSERT OVERWRITE TABLE table_tmp SELECT * FROM table DISTRIBUTE BY floor(rand()*N);

使用随机数控制文件数量 N

分区表

INSERT OVERWRITE TABLE table_tmp PARTITION(pt) SELECT ..., pt FROM table WHERE pt = * DISTRIBUTE BY pt, floor(rand()*N);

必须在 DISTRIBUTE BY 中包含分区字段,以确保合并是在分区内进行

通用策略

使用临时表中转,验证数据后再通过重命名交换表,避免数据丢失风险。

避免数据丢失

相关文档