Query Type
Snapshot Queries
Snapshot queries allow you to read the latest snapshots generated by commit/compaction. For MOR tables, it also merges the content of the latest delta log files in the query, providing near real-time data retrieval.
Incremental Queries
Incremental queries only retrieve data that has been added after a given commit/compaction.
Read Optimized Queries
Trade-off |
Real-Time Queries |
Read Optimized Queries |
---|---|---|
Data latency |
Low |
High |
Query latency |
Only for MOR tables, high (combining Parquet and delta log files) |
Low (Parquet file reading performance) |
COW Table Queries
- Real-time view reading (using SparkSQL an example): Directly read the Hudi table stored in the metadata service, where ${table_name} indicates the table name.
select (fields or aggregate functions) from ${table_name};
- Real-time view reading (using a Spark Jar job as an example):
Spark Jar jobs can read Hudi tables in two ways: using the Spark datasource API or submitting SQL queries through SparkSession.
Set the configuration item hoodie.datasource.query.type to snapshot (which is also the default value).
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .enableHiveSupport() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate(); // 1. Read Hudi tables using the Spark datasource API. val dataFrame = spark.read.format("hudi") .option("hoodie.datasource.query.type", "snapshot") // snapshot is also the default value. You can retain the default value. .load("obs://bucket/to_your_table"); // Specify the path of the Hudi table to read. DLI supports only OBS paths. dataFrame.show(100); // 2. Read Hudi tables by submitting SQL queries through SparkSession, which requires interconnection with the metadata service. spark.sql("select * from ${table_name}").show(100); } }
- Incremental view reading (using Spark SQL as an example):
hoodie.${table_name}.consume.mode=INCREMENTAL hoodie.${table_name}.consume.start.timestamp=Start commit time hoodie.${table_name}.consume.end.timestamp=End commit time
Run the following SQL statement:select (fields or aggregate functions) from ${table_name} where `_hoodie_commit_time`>'Start commit time' and `_hoodie_commit_time`<='End commit time' //This filtering condition is mandatory.
- Incremental view reading (using a Spark Jar job as an example):
The hoodie.datasource.query.type configuration item must be set to incremental.
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .enableHiveSupport() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate(); val startTime = "20240531000000"; val endTime = "20240531000000"; spark.read.format("hudi") .option("hoodie.datasource.query.type", "incremental") // Specify the query type as incremental query. .option("hoodie.datasource.read.begin.instanttime", startTime) // Specify the start commit for incremental pull. .option("hoodie.datasource.read.end.instanttime", endTime) // Specify the end commit for incremental pull. .load("obs://bucket/to_your_table") // Specify the path of the hudi table to read. .createTempView("hudi_incremental_temp_view"); // Register as a temporary Spark table. // The results must be filtered based on startTime and endTime. If endTime is not specified, filtering only needs to be done based on startTime. spark.sql("select * from hudi_incremental_temp_view where `_hoodie_commit_time`>'20240531000000' and `_hoodie_commit_time`<='20240531321456'") .show(100, false); } }
- Read optimized queries: Read optimized queries for COW tables is equivalent to snapshot queries.
MOR Table Queries
When using the metadata service in Spark SQL jobs or configuring HMS synchronization parameters, creating an MOR table will also create two additional tables: ${table_name}_rt and ${table_name}_ro. The table with the rt suffix represents real-time queries, while the table with the ro suffix represents read optimized queries. For example, if you create a Hudi table named ${table_name} using Spark SQL and synchronize it with the metadata service, two additional tables will be created in the database: ${table_name}_rt and ${table_name}_ro.
- Real-time view reading (using Spark SQL as an example): Directly read the Hudi table with the _rt suffix in the same database.
select count(*) from ${table_name}_rt;
- Real-time view reading (using a Spark Jar job as an example): Same as COW table operations, refer to the relevant COW table operations.
- Incremental view reading (using a Spark SQL job as an example): Same as COW table operations, refer to the relevant COW table operations.
- Incremental view reading (using a Spark Jar job as an example): Same as COW table operations, refer to the relevant COW table operations.
- Read optimized view reading (using a Spark Jar job as an example):
The hoodie.datasource.query.type configuration item must be set to read_optimized.
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .enableHiveSupport .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate spark.read.format("hudi") .option("hoodie.datasource.query.type", "read_optimized") // Specify the query type as read-optimized view. .load("obs://bucket/to_your_table") // Specify the path of the hudi table to read. .createTempView("hudi_read_optimized_temp_view") spark.sql("select * from hudi_read_optimized_temp_view").show(100) } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.