Updated on 2024-12-13 GMT+08:00

Aggregate Functions in Hudi

This section is available for MRS 3.5.0-LTS and later versions only.

Scenarios

The open-source community provides a pluggable payload mechanism to meet various aggregation requirements. To simplify payload development, MRS offers built-in typical aggregation functions. You can use these functions provided by Hudi to aggregate data with the same primary key.

Currently, the following aggregate functions and data types are supported:

  • sum: aggregates values across rows. It supports the following data types: DECIMAL (decimal), SHORT (small integer), INTEGER (integer), BIGINT (large integer), FLOAT (floating point number), and DOUBLE (double-precision floating point number).
  • product: calculates the product of rows. It supports the following data types: DECIMAL, SHORT, INTEGER, BIGINT, FLOAT, and DOUBLE.
  • count: counts values across rows. The INTEGER and BIGINT data types are supported.
  • max: identifies and retains the maximum value. It supports the following data types: STRING, DECIMAL, SHORT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE and TIMESTAMP.
  • min: identifies and retains the minimum value. It supports the following data types: STRING, DECIMAL, SHORT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE and TIMESTAMP.
  • last_value: replaces values with the latest imported values. All data types are supported.
  • last_non_null_value: replaces the values with the latest non-null values. All data types are supported.
  • first_value: retrieves the first null value in a dataset. All data types are supported.
  • first_non_null_value: selects the first non-null value in the dataset. All data types are supported.

Constraints

  • Due to the restrictions of Hudi OCC, you are not advised to concurrently write data to a Hudi table in multiple streams. If multiple streams need to be written at the same time, union all streams before write them to Hudi.
  • The functions are suitable for batch read.

Enabling the Aggregation Engine

Set hoodie.merge-engine=aggregate in the table creation property to enable the aggregation engine of the Hudi table. The aggregation engine gives each non-primary key field an aggregation function. You can use fields.<field-name>.aggregate-function to specify the field. The following is an example:

create table if not exists testTable(
id INT,
col1 INT,
col2 INT,
col3 INT,
col4 INT,
ts LONG
) using hudi
tblproperties (
primaryKey = 'id',
preCombineField='ts'
type = 'mor',
'hoodie.merge-engine' = 'aggregate',
'fields.col2.aggregate-function' = 'count', ----- The count operation is performed for col2 after aggregation based on the primary key.
'fields.col3.aggregate-function' = 'max' ----- The max operation is performed for col3 after aggregation based on the primary key.
);