更新时间:2024-12-25 GMT+08:00
分享

Hudi查询类型

快照查询

快照查询(Snapshot Queries)可以读到最新的commit/compaction产生的快照。对于MOR表,还会在查询中合并最新的delta log文件的内容,使读取的数据近实时。

增量查询

增量查询(Incremental Queries)只会查询到给定的commit/compaction之后新增的数据。

读优化查询

读优化查询(Read Optimized Queries)是针对MOR表进行的优化,只会读取最新的commit/compaction产生的快照(不包含delta log文件)。
表1 实时查询和读优化查询的trade-off

Trade-off

实时查询

读优化查询

Data Latency(数据时延)

Query Latency(查询时延)

只对于MOR表,高(合并parquet + delta log)

低 (读取parquet文件性能)

COW表查询

  • 实时视图读取(SparkSQL为例):直接读取元数据服务里面存储的Hudi表即可,${table_name}表示表名称。
    select (字段 or 聚合函数) from ${table_name};
  • 实时视图读取(Spark jar作业为例):

    Spark jar作业可以通过两种方式来读取Hudi表:Spark datasource API 或者通过 SparkSession 提交 SQL。

    配置项 hoodie.datasource.query.type 需要配置为 snapshot(snapshot同时也是默认值,因此可以缺省)。

    object HudiDemoScala {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .enableHiveSupport()
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
          .appName("HudiIncrementalReadDemo")
          .getOrCreate();
    
        // 1. 通过Spark datasource API 读取
        val dataFrame = spark.read.format("hudi")
          .option("hoodie.datasource.query.type", "snapshot") // snapshot同时也作为默认值,因此可以缺省本配置项
          .load("obs://bucket/to_your_table"); // 指定读取的hudi表路径,DLI仅支持使用OBS路径
        dataFrame.show(100);
    
        // 2. 通过SparkSession 提交 SQL,需要对接元数据服务。
        spark.sql("select * from ${table_name}").show(100);
      }
    }
  • 增量视图读取(以SparkSQL为例):

    首先配置

    hoodie.${table_name}.consume.mode=INCREMENTAL
    hoodie.${table_name}.consume.start.timestamp=开始Commit时间
    hoodie.${table_name}.consume.end.timestamp=结束Commit时间
    随后执行SQL
    select (字段 or 聚合函数) from ${table_name} where `_hoodie_commit_time`>'开始Commit时间' and `_hoodie_commit_time`<='结束Commit时间' //这个过滤条件必须带。
  • 增量视图读取(Spark jar 作业为例):

    配置项 hoodie.datasource.query.type 需要配置为 incremental。

    object HudiDemoScala {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .enableHiveSupport()
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
          .appName("HudiIncrementalReadDemo")
          .getOrCreate();
    
        val startTime = "20240531000000";
        val endTime = "20240531000000";
        spark.read.format("hudi")
          .option("hoodie.datasource.query.type", "incremental") // 指定查询类型为增量查询
          .option("hoodie.datasource.read.begin.instanttime", startTime)  // 指定初始增量拉取commit
          .option("hoodie.datasource.read.end.instanttime", endTime)  // 指定增量拉取结束commit
          .load("obs://bucket/to_your_table")  // 指定读取的hudi表路径
          .createTempView("hudi_incremental_temp_view");  // 注册为spark临时表
        // 结果必须根据startTime和endTime进行过滤,如果没有指定endTime,则只需要根据startTime进行过滤
        spark.sql("select * from hudi_incremental_temp_view where `_hoodie_commit_time`>'20240531000000' and `_hoodie_commit_time`<='20240531321456'")
          .show(100, false);
      }
    }
  • 读优化查询:COW表读优化查询等同于快照查询。

MOR表查询

在Spark SQL作业中使用元数据服务,或者配置了HMS同步参数,在创建MOR表后,会额外同步创建:“表名_rt”和“表名_ro”两张表。查询后缀为rt的表等同于实时查询,查询后缀为ro的表代表读优化查询。例如:通过Spark SQL创建hudi表名为${table_name}, 同步元数据服务后,数据库中多出两张表分别为${table_name}_rt和${table_name}_ro。

  • 实时视图读取(SparkSQL为例):直接读取相同数据库中后缀为_rt的hudi表即可。
    select count(*) from ${table_name}_rt;
  • 实时视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。
  • 增量视图读取(Spark SQL作业为例):与COW表操作一致,请参考COW表相关操作。
  • 增量视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。
  • 读优化视图读取(Spark jar作业为例):

    配置项 hoodie.datasource.query.type 需要配置为 read_optimized。

    object HudiDemoScala {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder
          .enableHiveSupport
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
          .appName("HudiIncrementalReadDemo")
          .getOrCreate
        spark.read.format("hudi")
          .option("hoodie.datasource.query.type", "read_optimized") // 指定查询类型为读优化视图
          .load("obs://bucket/to_your_table") // 指定读取的hudi表路径
          .createTempView("hudi_read_optimized_temp_view")
        spark.sql("select * from hudi_read_optimized_temp_view").show(100)
      }
    }

相关文档