更新时间:2024-11-26 GMT+08:00
分享

Hudi支持Partial Update

本章节内容仅适用于MRS 3.3.1-LTS及之后版本。

该特性允许用户使用Hudi完成部分列更新。用户可以使用同一主键下的最新数据逐一更新每行数据的不同列字段,直到整条数据完整。

场景说明

当前开源社区提供了PartialUpdateAvroPayload机制实现部分列更新,但该功能在多流更新,每条流更新不同列场景下会出现数据相互覆盖的问题。

通过引入sequence组的概念,Hudi可以很好的解决该问题,实现真正的部分更新。

按建表时按需求,将表中的列切分成不同的sequence组。每一个sequence组包含的列是否更新,由该sequence组的precombine字段决定,不同sequence组相互不影响。

使用约束

  • 由于Hudi OCC特性的限制,当前不建议多流并发写Hudi表。 如果需要多流同时写请将所有流union后写入Hudi。
  • 支持添加新列并作为新分组,但是新列添加后,需要同时修改表属性的tblproperties和serdeproperties属性,将新列添加到新的组里面。命令示例如下:
    • 添加新列col5,col6,group_3:

      alter table testTable add columns (col5 int, col6 int, group_3 int);

    • 添加新的分组信息到tblproperties中:

      alter table testTable set tblproperties('fields.group_3.sequence-group' = 'col5,col6');

    • 添加新的分组信息到serdeproperties中:

      alter table testTable set serdeproperties('fields.group_3.sequence-group' = 'col5,col6');

  • sequence组包含的列不能有重叠。sequence-1组和sequence-2都包含col1这一列,这种是不支持的。
  • group列的数据类型,仅支持int、bigint、float、double、date和timestamp。
  • 对于已存在表,如果要开启部分列更新功能;需要严格执行如下步骤,否则会出现数据不符合预期的情况。
    • 停止待修改表的数据写入。
    • MOR表需要执行全量compaction:

      强制开启compaction

      set hoodie.compaction.inline.max.delta.commits=1;

      set hoodie.compact.inline=true;

      执行全量compaction

      run compaction on my_table;

      reset hoodie.compaction.inline.max.delta.commits;

    • 添加新的分组信息到tblproperties中:

      alter table testTable set tblproperties('fields.group_1.sequence-group' = 'col1,col2');

    • 添加新的分组信息到serdeproperties中:

      alter table testTable set serdeproperties('fields.group_1.sequence-group' = 'col1,col2');

示例

create table if not exists testTable(
id INT,
col1 INT,
col2 INT,
group_1 INT,
col3 INT,
col4 INT,
group_2 INT,
ts LONG
) using hudi
tblproperties (
primaryKey = 'id',
type = 'mor',
'hoodie.merge-engine' = 'partial-update',
'fields.group_1.sequence-group' = 'col1,col2',---- 划分更新组, col1,col2 列是否更新由 group_1 决定,即group_1是该组的preCombine字段 'fields.group_2.sequence-group' = 'col3,col4' ---- 划分更新组,col3,col4 列是否更新由 group_2 决定,即group_2是该组的preCombine字段
);
--- 执行insert语句
insert into testTable values (1, 1, 1, 1, 1, 1, 1, 1000);    -- 首次写入
insert into testTable values (1, 2, 2, 2, 2, 2, null, 2000); -- 第二次写入
--- 查询结果
select * from testTable; 
--- 结果为1, 2, 2, 2, 1, 1, 1, 2000
--- 结果说明:第二次写入时,group_1的值为2 大于表里面的历史值1, 因此col1,col2 发生更新; group_2的值为null,因此col3,col4 不更新
---继续执行insert语句
insert into testTable values (1, null, 3, 2, 3, 3, 3, 4000);  -- 第三次写入
select * from testTable;
--- 结果为1, null, 3, 2, 3, 3, 3, 4000
--- 结果说明:第三次写入时,group_1的值为2 >= 表历史值2, 因此col1,col2 更新; group_2的值为3 > 历史值1,因此col3,col4更新

使用场景样例

union写入模拟多表join(flink可以使用该功能完成流流join,实现数据拉宽)。

下列代码以SparkSQL示例, 通过Hudi将t1和t2 join拉宽操作转成union + insert,避免join的开销:

create table if not exists t1(
id INT,
col1 INT,
col2 INT
) using parquet;
insert into t1 values(1, 1, 1);
create table if not exists t2(
id INT,
col3 INT,
col4 INT
) using parquet;
insert into t2 values(1, 2, 2);
create table if not exists joinTable(
id INT,
col1 INT,
col2 INT,
group_1 INT,
col3 INT,
col4 INT,
group_2 INT
) using hudi
tblproperties (
primaryKey = 'id',
type = 'mor',
'hoodie.index.type' = 'BUCKET',
hoodie.bucket.index.num.buckets=1,
'hoodie.merge-engine' = 'partial-update',
'fields.group_1.sequence-group' = 'col1,col2',
'fields.group_2.sequence-group' = 'col3,col4'
);

--- union + insert 模拟join操作
insert into joinTable
select id, col1, col2, 1, null, null, null from t1        --- 取任意非空值充当col1,col2的precombine值
union all
select id, null, null, null, col3, col4, 1 from t2;       --- 取任意非空值充当col3,col4的precombine值

--- 执行查询
select id,col1,col2,col3,col4 from joinTable;
-- 结果
-- 1,1,1,2,2

相关文档