更新时间:2025-12-26 GMT+08:00
分享

批量写入Hudi表

操作场景

Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。

  • INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,如果数据源中存在更新数据,则在数据湖中会出现重复数据。
  • BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。
  • UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。
  • 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。
  • 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。

批量写入Hudi表

  1. 引入Hudi包生成测试数据,参考使用Spark Shell创建Hudi表章节的13
  2. 写入Hudi表,写入命令中加入参数:option("hoodie.datasource.write.operation", "bulk_insert"),指定写入方式为bulk_insert,指定其他写入方式请参考表1
    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.datasource.write.recordkey.field", "uuid").
    option("hoodie.datasource.write.partitionpath.field", "").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.bulkinsert.shuffle.parallelism", 4).
    mode(Overwrite).
    save(basePath)
    • 示例中各参数介绍请参考表1
    • 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。
    • 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。

分区设置操作

Hudi支持多种分区方式,如多级分区、无分区、单分区、时间日期分区。用户可以根据实际需求选择合适的分区方式,接下来将详细介绍Hudi如何配置各种分区类型。

  • 多级分区

    多级分区即指定多个字段为分区键,需要注意的配置项:

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    用于指定数据分区的字段。通过设置这个参数,可以将数据按照指定的字段进行分区存储,从而提高数据查询的效率和管理的便利性。

    配置为多个分区字段,例如:p1,p2,p3。

    hoodie.datasource.hive_sync.partition_fields

    用于在将Hudi表同步到Hive元数据时指定分区字段。

    和hoodie.datasource.write.partitionpath.field的分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    指定用于生成记录键和分区路径的键生成器类,键生成器负责生成Hudi表中的记录键(record key)和分区路径(partition path),这对于数据的唯一性和分区管理至关重要。

    配置为org.apache.hudi.keygen.ComplexKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    用于指定分区提取器(Partition Extractor)的类。分区提取器负责从Hudi表的分区路径中提取分区字段值,以便在将Hudi表同步到Hive时正确地创建分区。

    配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。

    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.datasource.write.recordkey.field", "uuid").
    option("hoodie.datasource.write.partitionpath.field", "p1,p2,p3").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "p1,p2,p3").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.bulkinsert.shuffle.parallelism", 4).
    mode(Overwrite).
    save(basePath)
  • 无分区

    hudi支持无分区表,需要注意的配置项:

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为空。

    hoodie.datasource.hive_sync.partition_fields

    配置为空。

    hoodie.datasource.write.keygenerator.class

    配置为org.apache.hudi.keygen.NonpartitionedKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.NonPartitionedExtractor。

    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.datasource.write.recordkey.field", "uuid").
    option("hoodie.datasource.write.partitionpath.field", "").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.bulkinsert.shuffle.parallelism", 4).
    mode(Overwrite).
    save(basePath)
  • 单分区

    和多级分区类似,需要配置项:

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为一个字段,例如:p

    hoodie.datasource.hive_sync.partition_fields

    和hoodie.datasource.write.partitionpath.field分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    默认可以配置为org.apache.hudi.keygen.SimpleKeyGenerator和org.apache.hudi.keygen.ComplexKeyGenerator,也可以不配置。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。

    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.datasource.write.recordkey.field", "uuid").
    option("hoodie.datasource.write.partitionpath.field", "p").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "p").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    option("hoodie.bulkinsert.shuffle.parallelism", 4).
    mode(Overwrite).
    save(basePath)
  • 时间日期分区

    即指定date类型字段作为分区字段,需要注意的配置项:

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为date类型字段。

    hoodie.datasource.hive_sync.partition_fields

    和hoodie.datasource.write.partitionpath.field分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    配置为org.apache.hudi.keygen.ComplexKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor。

    SlashEncodedDayPartitionValueExtractor存在以下约束:要求写入的日期格式为yyyy/mm/dd。

  • 分区排序

    配置项

    说明

    hoodie.bulkinsert.user.defined.partitioner.class

    用于指定分区提取器(Partition Extractor)的类。分区提取器负责从Hudi表的分区路径中提取分区字段值,以便在将Hudi表同步到Hive时正确地创建分区。

    Hudi提供了多种分区提取器类,每种类适用于不同的分区路径格式:

    1. MultiPartKeysValueExtractor:默认的分区提取器,适用于多级分区路径。
      • 类名:org.apache.hudi.hive.MultiPartKeysValueExtractor
      • 适用场景:分区路径格式为 part1=value1/part2=value2/...。
    2. NonPartitionedExtractor:适用于无分区的表。
      • 类名:org.apache.hudi.hive.NonPartitionedExtractor
      • 适用场景:表没有分区路径。
    3. HiveStylePartitionValueExtractor:适用于 Hive 风格的分区路径。
      • 类名:org.apache.hudi.hive.HiveStylePartitionValueExtractor
      • 适用场景:分区路径格式为 part1=value1/part2=value2/...,与 Hive 的分区路径格式一致。
    4. CustomPartitionValueExtractor:用户自定义的分区提取器类,适用于特殊需求。
      • 类名:用户自定义的类名
      • 适用场景:根据用户需求提取分区字段值。

    bulk_insert默认字符排序,仅适用于StringType的主键。

相关文档