Updated on 2025-04-15 GMT+08:00

Hudi Table Index Design Specifications

Rules

  • Changing the index type of a table is prohibited.

    The index of a Hudi table determines the way data is stored. Changing the index type without caution can result in data duplication and accuracy issues between existing and new data in the table. Common index types include:

    • Bloom index: A unique index for Spark engine that uses the bloom filter mechanism and writes the bloom index content to the footer of Parquet files.
    • Bucket index: During the data write process, data is bucketed based on the primary key using hash calculation. This index has the fastest write speed but requires proper configuration of the number of buckets. Both Flink and Spark support this index.
    • State index: A unique index for Flink engine that records the storage location of row records in the state backend. During job cold start, it traverses all data storage files to generate index information.
  • When using Flink state index, Spark cannot continue writing after Flink writes.

    Flink only generates log files when writing to Hudi's MOR table, and later converts the log files to Parquet files through compaction operations. Spark heavily relies on the existence of Parquet files when updating Hudi tables. Writing with Spark when the current Hudi table writes log files will result in duplicate data. During the batch initialization phase, use Spark for batch writing to Hudi tables first, and then use Flink with Flink state index for writing, as Flink will traverse all data files to generate the state index during cold start.

  • In real-time data lake scenarios, Spark engine can use Bucket index, and Flink engine can use either Bucket index or State index.

    Real-time data ingestion to the lake requires high performance within or within minutes. Index selection affects the performance of writing data to the Hudi table. The performance differences between indexes are:

    • Bucket index

      Advantages: During the write process, data is hashed and written into buckets based on the primary key, resulting in high performance. It is not limited by the amount of data in the table. Both Flink and Spark engines support this index, allowing cross-writing on the same table.

      Disadvantages: The number of buckets cannot be dynamically adjusted. Fluctuations in data volume and continuous increase in overall data volume can result in excessively large data files in individual buckets. Partitioned tables need to be used for improvement.

    • Flink state index

      Advantages: The primary key's index information exists in the state backend. Data updates only require querying the state backend, which is quite fast. The size of the generated data files is stable and will not cause issues with small or oversized files.

      Disadvantages: This index is unique to Flink. When the total number of rows in the table reaches billions, it is necessary to optimize the state backend parameters to maintain write performance. Using this index does not support cross-writing between Flink and Spark.

  • For tables with continuously increasing data volume, when using the Bucket index, time partitioning must be used, with the partition key based on the data creation time.

    According to the characteristics of the Flink state index, when the data volume of the Hudi table exceeds a certain amount, the pressure on the Flink job's state backend becomes significant, requiring optimization of state backend parameters to maintain performance. Also, since Flink needs to traverse the entire table data during a cold start, a large amount of data will cause the Flink job to start slowly. Therefore, from the perspective of simplifying usage, for tables with large amounts of data, the Bucket index can be used to avoid complex tuning of the state backend.

    Therefore, from the perspective of simplifying usage, for tables with large amounts of data, the Bucket index can be used to avoid complex tuning of the state backend.

Recommendations

  • For tables based on Flink's streaming write with data volumes exceeding 200 million records, use the bucket index. For volumes within 200 million, the Flink state index can be used.

    According to the characteristics of the Flink state index, when the data volume of the Hudi table exceeds a certain amount, the pressure on the Flink job's state backend becomes significant, requiring optimization of state backend parameters to maintain performance. Also, since Flink needs to traverse the entire table data during a cold start, a large amount of data will cause the Flink job to start slowly. Therefore, from the perspective of simplifying usage, for tables with large amounts of data, the Bucket index can be used to avoid complex tuning of the state backend.

    Therefore, from the perspective of simplifying usage, for tables with large amounts of data, the Bucket index can be used to avoid complex tuning of the state backend.

  • Design tables based on the bucket index with each bucket containing 2 GB of data.

    To avoid having a single bucket being too large, it is recommended that the data volume of a single bucket not exceed 2 GB (this 2 GB refers to the data content size, not the number of data rows or the size of the Parquet data file). The goal is to control the size of the corresponding bucket's Parquet files within 256 MB (to balance memory consumption and efficient use of HDFS storage). Thus, the 2 GB limit is just an empirical value, as the size of business data varies after columnar compression.

    Why is 2 GB recommended?

    • After storing 2 GB of data as columnar Parquet files, the data file size is approximately 150 MB to 256 MB. There may be variations for different service data. A single HDFS data block is generally 128 MB, which allows for efficient use of storage space.
    • The memory space occupied by data read/write is the original data size (including null values), and 2 GB is within the acceptable range for a single task during big data computation.

    What could be the impact if the data volume of a single bucket exceeds this range?

    • Read/write tasks might encounter OOM issues, which can be resolved by increasing the memory allocation per task.
    • Read/write performance may decrease because the data volume processed by a single task becomes larger, leading to longer processing times.