更新时间:2024-05-28 GMT+08:00
分享

Spark读写Hudi开发规范

Spark写Hudi各种写入模式参数规范说明

类型

说明

开启参数

场景选择

特点

upsert

update + insert

Hudi默认写入类型,写入具有更新能力。

默认,无需参数开启。

  • SparkSQL:
    set hoodie.datasource.write.operation=upsert;
  • DataSource Api:
    df.write
    .format("hudi")
    .options(xxx)
    .option("hoodie.datasource.write.operation", "upsert")
    .mode("append")
    .save("/tmp/tablePath")

默认选择。

优点:

  • 支持小文件合并。
  • 支持更新。

缺点:

  • 写入速度中规中矩。

append

数据无更新直接写入

  • Spark:Spark侧没有纯append模式可使用bulk insert模式替代。
  • SparkSQL:
    set hoodie.datasource.write.operation = bulk_insert;
    set hoodie.datasource.write.row.writer.enable = true;
  • DataSource Api:
    df.write
    .format("hudi")
    .options(xxx)
    .option("hoodie.datasource.write.operation", "bulk_insert")
    .option("hoodie.datasource.write.row.writer.enable", "true")
    .mode("append")
    .save("/tmp/tablePath")

追求高吞吐,无数据更新场景。

优点:

  • 写入速度最快。

缺点:

  • 无小文件合并能力。
  • 无更新能力。
  • 需要clustering合并小文件。

delete

删除操作

无需参数,直接使用delete语法即可:

delete from tableName where primaryKey='id1';

SQL删除数据数据场景。

和upsert类型一样。

Insert overwrite

覆写分区

无需参数,直接使用insert overwrite语法即可:

insert overwrite table tableName partition(dt ='2021-01-04') 
select * from srcTable;

分区级别重新。

覆写分区。

Insert overwrite table

覆写全表

无需参数,直接使用insert overwrite语法即可:

insert overwrite table tableName 
select * from srcTable;

全部重写。

覆写全表。

Bulk_insert

批量导入

  • SparkSQL:
    set hoodie.datasource.write.operation = bulk_insert;
    set hoodie.datasource.write.row.writer.enable = true;
  • DataSource Api:
    df.write
    .format("hudi")
    .options(xxx)
    .option("hoodie.datasource.write.operation", "bulk_insert")
    .option("hoodie.datasource.write.row.writer.enable", "true")
    .mode("append")
    .save("/tmp/tablePath")

建议表初始化搬迁的时候使用。

和append模式一样。

Spark增量读取Hudi参数规范

类型

说明

开启参数

场景选择

特点

snapshot

实时数据读取。

默认,无需参数开启

SparkSQL:

set hoodie.datasource.query.type=snapshot;

DataSource Api:

val df = spark.read
.format("hudi")
.option("hoodie.datasource.query.type","snapshot")
.load("tablePath")

默认选择

每次读的数据都是最新的,数据写入即可见。

incremental

增量查询,只查询两次commit之间的数据

  • SparkSQL:
    set hoodie.tableName.consume.mode=INCREMENTAL;// 必须设置当前表读取为增量读取模式
    set hoodie.tableName.consume.start.timestamp=20201227153030;// 指定初始增量拉取commit
    set hoodie.tableName.consume.end.timestamp=20210308212318;  // 指定增量拉取结束commit,如果不指定的话采用最新的commit
    select * from tableName where `_hoodie_commit_time`>'20201227153030' and `_hoodie_commit_time`<='20210308212318'; // 结果必须根据start.timestamp和end.timestamp进行过滤,如果没有指定end.timestamp,则只需要根据start.timestamp进行过滤。
    set hoodie.tableName.consume.mode=SNAPSHOT;  // 使用完增量模式,必须把查询模式重新设置回来
  • DataSource Api:
    val df = spark.read
    .format("hudi")
    .option("hoodie.tableName.consume.mode","INCREMENTAL")
    .option("hoodie.tableName.consume.start.timestamp","20201227153030")
    .option("hoodie.tableName.consume.end.timestamp","20210308212318")
    .load("tablePath")
    .where("`_hoodie_commit_time`>'20201227153030' and `_hoodie_commit_time`<='20210308212318'")

流式加工场景, 每次只拉取增量而非全量数据计算。

只读两次commit之间的数据。不是全表扫描,比通过where条件取两次commit之前的数据效率要高很多。

read_optimized

读优化视图。

只读取表里面parquet文件中的数据, 对于mor表来说,新增数据会写到log里面,故该模式读取的数据不是最新的。

  • SparkSQL:

    mor表同步hive会产生三张表,分别是主表,ro表和rt表。 ro表即读优化表,直接读ro表即可

    select * from tableName_ro;
  • DataSource Api:
    val df = spark.read
    .format("hudi")
    .option("hoodie.datasource.query.type","read_optimized")
    .load("tablePath")

对查询性能有要求,但是可以接受一定时间的数据时延。

对于mor表来说,这种读方式性能比读实时表快很多。该读取方式不会读log数据,这些log中新增数据compaction之后才能读到,因此使用该模式读取数据有一定的数据时延。

分享:

    相关文档

    相关产品