Hudi查询类型
快照查询
快照查询(Snapshot Queries)可以读到最新的commit/compaction产生的快照。对于MOR表,还会在查询中合并最新的delta log文件的内容,使读取的数据近实时。
增量查询
增量查询(Incremental Queries)只会查询到给定的commit/compaction之后新增的数据。
读优化查询
Trade-off |
实时查询 |
读优化查询 |
---|---|---|
Data Latency(数据时延) |
低 |
高 |
Query Latency(查询时延) |
只对于MOR表,高(合并parquet + delta log) |
低 (读取parquet文件性能) |
COW表查询
- 实时视图读取(SparkSQL为例):直接读取元数据服务里面存储的Hudi表即可,${table_name}表示表名称。
select (字段 or 聚合函数) from ${table_name};
- 实时视图读取(Spark jar作业为例):
Spark jar作业可以通过两种方式来读取Hudi表:Spark datasource API 或者通过 SparkSession 提交 SQL。
配置项 hoodie.datasource.query.type 需要配置为 snapshot(snapshot同时也是默认值,因此可以缺省)。
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .enableHiveSupport() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate(); // 1. 通过Spark datasource API 读取 val dataFrame = spark.read.format("hudi") .option("hoodie.datasource.query.type", "snapshot") // snapshot同时也作为默认值,因此可以缺省本配置项 .load("obs://bucket/to_your_table"); // 指定读取的hudi表路径,DLI仅支持使用OBS路径 dataFrame.show(100); // 2. 通过SparkSession 提交 SQL,需要对接元数据服务。 spark.sql("select * from ${table_name}").show(100); } }
- 增量视图读取(以SparkSQL为例):
hoodie.${table_name}.consume.mode=INCREMENTAL hoodie.${table_name}.consume.start.timestamp=开始Commit时间 hoodie.${table_name}.consume.end.timestamp=结束Commit时间
随后执行SQLselect (字段 or 聚合函数) from ${table_name} where `_hoodie_commit_time`>'开始Commit时间' and `_hoodie_commit_time`<='结束Commit时间' //这个过滤条件必须带。
- 增量视图读取(Spark jar 作业为例):
配置项 hoodie.datasource.query.type 需要配置为 incremental。
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .enableHiveSupport() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate(); val startTime = "20240531000000"; val endTime = "20240531000000"; spark.read.format("hudi") .option("hoodie.datasource.query.type", "incremental") // 指定查询类型为增量查询 .option("hoodie.datasource.read.begin.instanttime", startTime) // 指定初始增量拉取commit .option("hoodie.datasource.read.end.instanttime", endTime) // 指定增量拉取结束commit .load("obs://bucket/to_your_table") // 指定读取的hudi表路径 .createTempView("hudi_incremental_temp_view"); // 注册为spark临时表 // 结果必须根据startTime和endTime进行过滤,如果没有指定endTime,则只需要根据startTime进行过滤 spark.sql("select * from hudi_incremental_temp_view where `_hoodie_commit_time`>'20240531000000' and `_hoodie_commit_time`<='20240531321456'") .show(100, false); } }
- 读优化查询:COW表读优化查询等同于快照查询。
MOR表查询
在Spark SQL作业中使用元数据服务,或者配置了HMS同步参数,在创建MOR表后,会额外同步创建:“表名_rt”和“表名_ro”两张表。查询后缀为rt的表等同于实时查询,查询后缀为ro的表代表读优化查询。例如:通过Spark SQL创建hudi表名为${table_name}, 同步元数据服务后,数据库中多出两张表分别为${table_name}_rt和${table_name}_ro。
- 实时视图读取(SparkSQL为例):直接读取相同数据库中后缀为_rt的hudi表即可。
select count(*) from ${table_name}_rt;
- 实时视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。
- 增量视图读取(Spark SQL作业为例):与COW表操作一致,请参考COW表相关操作。
- 增量视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。
- 读优化视图读取(Spark jar作业为例):
配置项 hoodie.datasource.query.type 需要配置为 read_optimized。
object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .enableHiveSupport .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate spark.read.format("hudi") .option("hoodie.datasource.query.type", "read_optimized") // 指定查询类型为读优化视图 .load("obs://bucket/to_your_table") // 指定读取的hudi表路径 .createTempView("hudi_read_optimized_temp_view") spark.sql("select * from hudi_read_optimized_temp_view").show(100) } }