Updated on 2024-12-13 GMT+08:00

Partial Update

This section applies only to MRS 3.3.1-LTS and later.

Hudi allows users to update columns partially. You can use the latest data under the same primary key to update fields in different columns of each row one by one until the entire data is complete.

Scenarios

Currently, the open source community provides PartialUpdateAvroPayload to update columns partially. However, when multiple streams are updated and different columns are updated in each stream, data is mutually overwritten.

By introducing the sequence group, Hudi can solve this problem and implement real partial update.

Columns in a table are divided into different sequence groups as you need during table creation. Whether the columns in each sequence group are updated is determined by the precombine field of the sequence group. The sequence groups are not affected by each other.

Constraints

  • Due to the restrictions of Hudi OCC, you are not advised to concurrently write data to a Hudi table in multiple streams. If multiple streams need to be written at the same time, union all streams before write them to Hudi.
  • New columns can be added and grouped. To add the column to a new group, you need to modify tblproperties and serdeproperties of the table. Example statements are as follows:
    • Add new columns col5,col6,group_3.

      alter table testTable add columns (col5 int, col6 int, group_3 int);

    • Add the information of the new group to tblproperties.

      alter table testTable set tblproperties('fields.group_3.sequence-group' = 'col5,col6');

    • Add the information of the new group to serdeproperties.

      alter table testTable set serdeproperties('fields.group_3.sequence-group' = 'col5,col6');

  • Columns in a sequence group cannot overlap. For example, there cannot be the same col1 column in both sequence-1 and sequence-2.
  • For the group column, only int, bigint, float, double, date, and timestamp types are supported.
  • If partial update is enabled for existing tables, you must operate in compliance with the following requirements. Otherwise, the data may not meet the expectation.
    • Stop writing data to the table to be modified.
    • Full compaction must be performed for the MOR table.

      Forcibly enable compaction.

      set hoodie.compaction.inline.max.delta.commits=1;

      set hoodie.compact.inline=true;

      Perform full compaction.

      run compaction on my_table;

      reset hoodie.compaction.inline.max.delta.commits;

    • Add the information of the new group to tblproperties.

      alter table testTable set tblproperties('fields.group_1.sequence-group' = 'col1,col2');

    • Add the information of the new group to serdeproperties.

      alter table testTable set serdeproperties('fields.group_1.sequence-group' = 'col1,col2');

Example

create table if not exists testTable(
id INT,
col1 INT,
col2 INT,
group_1 INT,
col3 INT,
col4 INT,
group_2 INT,
ts LONG
) using hudi
tblproperties (
primaryKey = 'id',
type = 'mor',
'hoodie.merge-engine' = 'partial-update',
'fields.group_1.sequence-group' = 'col1,col2',---- Divide update groups. Whether the col1 and col2 columns are updated is determined by group_1. That is, group_1 is the preCombine field of the group. 'fields.group_2.sequence-group' = 'col3,col4' ---- Divide update groups. Whether the col3 and col4 columns are updated is determined by group_2. That is, group_2 is the preCombine field of the group.
);
---Run the insert statement.
insert into testTable values (1, 1, 1, 1, 1, 1, 1, 1000); -- First write
insert into testTable values (1, 2, 2, 2, 2, 2, null, 2000); -- Second write
---Query result
select * from testTable; 
---The result is 1, 2, 2, 2, 1, 1, 1, 2000.
Result description: When data is written for the second time, the value of group_1 is 2, which is greater than the historical value 1 in the table. This means that col1 and col2 are updated. The value of group_2 is null, which means that col3 and col4 are not updated.
---Execute the insert statement.
insert into testTable values (1, null, 3, 2, 3, 3, 3, 4000);  -- Third write
select * from testTable;
---The result is 1, null, 3, 2, 3, 3, 3, 4000.
---Result description: When data is written for the third time, the value of group_1 is 2, which is greater than or equal to historical value 2 in the table. This means that, col1 and col2 are updated. The value of group_2 is 3, which is greater than historical value 1 in the table. This means that col3 and col4 are updated.

Application Scenario Example

The union write simulates a multi-table join. (Flink can use this function to join streams.)

The following code uses SparkSQL as an example. Hudi is used to convert the join of t1 and t2 to a union and an insert operation to reduce join overhead.

create table if not exists t1(
id INT,
col1 INT,
col2 INT
) using parquet;
insert into t1 values(1, 1, 1);
create table if not exists t2(
id INT,
col3 INT,
col4 INT
) using parquet;
insert into t2 values(1, 2, 2);
create table if not exists joinTable(
id INT,
col1 INT,
col2 INT,
group_1 INT,
col3 INT,
col4 INT,
group_2 INT
) using hudi
tblproperties (
primaryKey = 'id',
type = 'mor',
'hoodie.index.type' = 'BUCKET',
hoodie.bucket.index.num.buckets=1,
'hoodie.merge-engine' = 'partial-update',
'fields.group_1.sequence-group' = 'col1,col2',
'fields.group_2.sequence-group' = 'col3,col4'
);

--- Union and insert are performed to simulate the join operation.
insert into joinTable
select id, col1, col2, 1, null, null, null from t1        --- Use any non-null value as the precombine of col1 and col2.
union all
select id, null, null, null, col3, col4, 1 from t2;       --- Use any non-null value as the precombine of col3 and col4.

---Execute the query.
select id,col1,col2,col3,col4 from joinTable;
-- Result
-- 1, 1, 1, 2, 2