消减Spark Insert Overwrite自读自写风险
场景说明
对于目的表,需要使用动态分区插入(使用历史分区更新),且目的表和数据源表都是同一张表。
由于直接在源表上执行insert overwrite可能会导致数据丢失或数据不一致的风险,建议将作业拆解为2个子作业进行处理,从而保证每次作业重复执行结果都一致。
- 作业1:新建临时表,将结果数据写入临时表。
- 作业2:将临时表数据写入源表和删除临时表。
操作步骤
假设存在如下一张表:
user_data(user_group int, user_name string, update_time timestamp);
其中user_group是分区列,现在需要根据已有数据,按更新时间进行排序,刷新用户组信息。
任务1:新建临时表,将结果数据写临时表。
- 开启Hive动态分区参数。
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict;
- 创建一个临时表存储去重后的数据。
CREATE TABLE temp_user_data AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER(PARTITION BY user_group ORDER BY update_time DESC) as rank FROM user_data ) tmp WHERE rank = 1;
任务2:将临时表数据写入源表和删除临时表。
- 使用临时数据作为数据源,插入目的表。
INSERT OVERWRITE TABLE user_data SELECT user_group, user_name, update_time FROM temp_user_data;
- 清理临时表。
DROP TABLE IF EXISTS temp_user_data;