批量写入
Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。
- INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,若数据源中存在更新数据,则在数据湖中会出现重复数据。
- BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。
- UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。
- 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。
- 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。
插入示例:
df.write.format("hudi"). option(PRECOMBINE_FIELD_OPT_KEY, "col4").// 指定预合并字段,该字段需要可排序 option(RECORDKEY_FIELD_OPT_KEY, "primary_key"). // 指定hudi 主键, hudi要求主键唯一 option(PARTITIONPATH_FIELD_OPT_KEY, "col0").// 指定分区 option(OPERATION_OPT_KEY, "bulk_insert").// 指定本次操作方式为bulk_insert option("hoodie.bulkinsert.shuffle.parallelism", par.toString).//指定bulk_insert操作的并发度 option(HIVE_SYNC_ENABLED_OPT_KEY, "true").// 指定同步hudi表到hive option(HIVE_PARTITION_FIELDS_OPT_KEY, "col0").// 指定hive分区列名 option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor"). option(HIVE_DATABASE_OPT_KEY, db). option(HIVE_TABLE_OPT_KEY, tableName). option(HIVE_USE_JDBC_OPT_KEY, "false").//是否使用jdbc同步Hive,默认为true option(TABLE_NAME, tableName). // 指定表名 mode(Overwrite). // 指定写入方式 save(s"/tmp/${db}/${tableName}")// 指定hudi表存储路径
- 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。
- 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。
分区设置操作
Hudi支持多种分区方式,如多级分区、无分区、单分区、时间日期分区。用户可以根据实际需求选择合适的分区方式,接下来将详细介绍Hudi如何配置各种分区类型。
- 多级分区
配置项
说明
hoodie.datasource.write.partitionpath.field
配置为多个分区字段,例如:p1,p2,p3
hoodie.datasource.hive_sync.partition_fields
配置为p1,p2,p3和hoodie.datasource.write.partitionpath.field的分区字段保持一致
hoodie.datasource.write.keygenerator.class
配置为org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class
配置为org.apache.hudi.hive.MultiPartKeysValueExtractor
- 无分区
配置项
说明
hoodie.datasource.write.partitionpath.field
配置为空
hoodie.datasource.hive_sync.partition_fields
配置为空
hoodie.datasource.write.keygenerator.class
配置为org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class
配置为org.apache.hudi.hive.NonPartitionedExtractor
- 单分区
配置项
说明
hoodie.datasource.write.partitionpath.field
配置为一个字段,例如:p
hoodie.datasource.hive_sync.partition_fields
配置为p,和
hoodie.datasource.write.partitionpath.field
分区字段保持一致
hoodie.datasource.write.keygenerator.class
默认配置为org.apache.hudi.keygen.SimpleKeyGenerator ,也可以不配置
hoodie.datasource.hive_sync.partition_extractor_class
配置为org.apache.hudi.hive.MultiPartKeysValueExtractor
- 时间日期分区
配置项
说明
hoodie.datasource.write.partitionpath.field
配置为date类型字段比如operationTime
hoodie.datasource.hive_sync.partition_fields
配置为operationTime,和上面分区字段保持一致
hoodie.datasource.write.keygenerator.class
默认配置为org.apache.hudi.keygen.SimpleKeyGenerator ,也可以不配置
hoodie.datasource.hive_sync.partition_extractor_class
配置org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
SlashEncodedDayPartitionValueExtractor存在以下约束:要求写入的日期格式为yyyy/mm/dd。
- 分区排序:
配置项
说明
hoodie.bulkinsert.user.defined.partitioner.class
指定分区排序类,可自行定义排序方法,具体参考样例代码
bulk_insert默认字符排序,仅适用于StringType的主键。