更新时间:2025-09-04 GMT+08:00
分享

消减Spark Insert Overwrite自读自写风险

场景说明

对于目的表,需要使用动态分区插入(使用历史分区更新),且目的表和数据源表都是同一张表。

由于直接在源表上执行insert overwrite可能会导致数据丢失或数据不一致的风险,建议将作业拆解为2个子作业进行处理,从而保证每次作业重复执行结果都一致。

  • 作业1:新建临时表,将结果数据写入临时表。
  • 作业2:将临时表数据写入源表和删除临时表。

操作步骤

假设存在如下一张表:

user_data(user_group int, user_name string, update_time timestamp);

其中user_group是分区列,现在需要根据已有数据,按更新时间进行排序,刷新用户组信息。

任务1:新建临时表,将结果数据写临时表。

  1. 开启Hive动态分区参数。

    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;

  2. 创建一个临时表存储去重后的数据。

    CREATE TABLE temp_user_data AS
    SELECT * FROM (
    SELECT *,
    ROW_NUMBER() OVER(PARTITION BY user_group ORDER BY update_time DESC) as rank
    FROM user_data
    ) tmp
    WHERE rank = 1;

任务2:将临时表数据写入源表和删除临时表。

  1. 使用临时数据作为数据源,插入目的表。

    INSERT OVERWRITE TABLE user_data
    SELECT user_group, user_name, update_time
    FROM temp_user_data;

  2. 清理临时表。

    DROP TABLE IF EXISTS temp_user_data;

相关文档