更新时间:2024-07-02 GMT+08:00
分享

消减Spark Insert Overwrite自读自写风险

场景说明

对于目的表,需要使用动态分区插入(使用历史分区更新),且目的表和数据源表都是同一张表。

由于直接在原表上执行insert overwrite可能会导致数据丢失或数据不一致的风险,建议首先使用一个临时表来处理数据。

操作步骤

假设存在如下一张表:

user_data(user_group int, user_name string, update_time timestamp);

其中user_group是分区列,现在需要根据已有数据,按更新时间进行排序,刷新用户组信息。

  1. 开启Hive动态分区参数。

    set hive.exec.dynamic.partition=true;

    set hive.exec.dynamic.partition.mode=nonstrict;

  2. 创建一个临时表存储去重后的数据。

    CREATE TABLE temp_user_data AS

    SELECT * FROM (

    SELECT *,

    ROW_NUMBER() OVER(PARTITION BY user_group ORDER BY update_time DESC) as rank

    FROM user_data

    ) tmp

    WHERE rank = 1;

  3. 使用临时数据作为数据源,插入目的表。

    INSERT OVERWRITE TABLE user_data

    SELECT user_group, user_name, update_time

    FROM temp_user_data;

  4. 清理临时表。

    DROP TABLE IF EXISTS temp_user_data;

相关文档