Updated on 2024-08-30 GMT+08:00

Hudi Table Index Design Specifications

rules

  • Do not modify the table index type.

    The index of the Hudi table determines the data storage mode. If the index type is changed randomly, duplicate existing data and new data in the table will occur and data accuracy will be affected. Common index types are as follows:

    • Bloom index: unique index of the Spark engine. The bloomfiter mechanism is used to write the Bloom index content to the footer of the Parquet file.
    • Bucket index: During data writing, the primary key is used to perform hash calculation and write data into buckets. This index has the fastest write speed, but the number of buckets needs to be properly configured. Both Flink and Spark support this index.
    • Status index: It is unique to the Flink engine. It records the storage location of row records to the status backend. During the cold start of a job, all data storage files are traversed to generate index information.
  • If the Flink status index is used, Spark cannot continue to write data after data is written to Flink.

    When writing data to the MOR table of Hudi, Flink generates only log files. The log files will be converted into parquet files by performing the compaction operation. When updating the Hudi table, Spark depends on whether the parquet file exists. If the current Hudi table is written into a log file, duplicate data will be generated if Spark is used to write the Hudi table. In the batch initialization phase, Spark is used to write data to Hudi tables in batches. When Flink is used to write data based on Flink status indexes, the cause is that all data files are traversed to generate status indexes during Flink cold startup.

  • In the real-time lake access scenario, the Spark engine uses bucket indexes, and the Flink engine can use bucket indexes or status indexes.

    In real time, the high performance data needs to be imported within minutes or at the minute level. Index selection affects the performance of writing Hudi tables. The performance differences between indexes are as follows:

    • Bucket index.

      Advantage: During the write process, the primary key is written in hash buckets, which provides high performance and is not limited by the data volume of the table. Both the Flink and Spark engines support this function. The Flink and Spark engines can cross-write the same table.

      Disadvantage: The number of buckets cannot be dynamically adjusted. If the data volume fluctuates and the data volume of the entire table keeps increasing, a large data file is generated due to a large data volume in a single bucket. Balance improvement needs to be performed in conjunction with partition tables.

    • Flink status index.

      Advantages: The index information of the primary key exists in the state backend. Data update only needs to check the state backend, which is fast. In addition, the size of the generated data files is stable, and the problem of small files and oversized files is not generated.

      Disadvantage: This index is specific to Flink. When the total number of data rows in a table reaches hundreds of millions, state backend parameters need to be optimized to maintain write performance. This index does not support cross-write of Flink and Spark.

  • For a table whose data volume keeps increasing, the bucket index must be partitioned by time, and the partition key must be the data creation time.

    According to the characteristics of Flink status index, if the Hudi table exceeds a certain amount of data, the Flink job status backend is under great pressure. To maintain performance, you need to optimize the status backend parameters. In addition, the entire table data needs to be traversed during the cold start of Flink. As a result, the Flink job startup is slow due to a large amount of data. Therefore, for a table with a large amount of data, bucket indexes can be used to avoid complex state backend optimization.

    If the bucket index + partition table mode cannot balance the problem of large Bueckt buckets, you can use the Flink state index and optimize the corresponding configuration parameters according to the specifications.

The suggestion

  • If the number of data records in a Flink-based streaming table exceeds 200 million, the bucket index is used. If the number of data records does not exceed 200 million, the Flink status index can be used.

    According to the characteristics of Flink status index, when the Hudi table exceeds a certain amount of data, the Flink job status backend is under great pressure. To maintain performance, you need to optimize the status backend parameters. In addition, the entire table data needs to be traversed during the cold start of Flink. A large amount of data may cause slow startup of Flink jobs. Therefore, for tables with a large amount of data, bucket indexes can be used to avoid complex state backend optimization.

    If the bucket index + partition table mode cannot balance the problem of large Bueckt buckets, you can use the Flink state index and optimize the corresponding configuration parameters according to the specifications.

  • Bucket index-based tables are designed based on the data volume of a single bucket, which is 2 GB.

    To prevent a single bucket from being too large, it is recommended that the data volume of a single bucket not exceed 2 GB. (The 2 GB indicates the size of the data content, not the number of data rows or the size of the parquet data file.) to limit the Parquet file size of the corresponding bucket to 256 MB. Balance read and write memory consumption and HDFS storage utilization. Therefore, the limit of 2 GB is only an empirical value because the size of different service data after column-store compression is different.

    Why is 2 GB recommended?

    • After 2 GB data is stored as a column-store Parquet file, the size of the data file is about 150 MB to 256 MB. Data varies depending on the service. The size of a single HDFS data block is usually 128 MB, which effectively utilizes the storage space.
    • The memory space occupied by data read and write is the size of original data (including null values). During big data computing, 2 GB is within the acceptable range for single-task read and write.

    If the data volume of a single bucket exceeds the value range, what are the possible impacts?

    • OOM may occur in read and write tasks. To solve this problem, increase the memory usage of a single task.
    • The read and write performance deteriorates because the amount of data processed by a single task increases, which increases the processing time.