更新时间:2024-12-25 GMT+08:00
分享

API语法说明

设置写入方式

Hudi通过hoodie.datasource.write.operation参数设置写入模式。

  • insert: 该操作不需要通过索引去查询具体更新的文件分区,因此它的速度比upsert快。当不包含更新数据时建议使用该操作,如果存在更新数据使用该操作会出现重复数据。
  • bulk_insert:该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而upsert和insert操作可以很好的控制小文件。
  • upsert: 默认操作类型。Hudi会根据主键进行判断即将插入的数据是否包含更新数据,如果包含则执行upsert,否则执行insert。
  • 由于insert时不会对主键进行排序,所以初始化数据集不建议使用insert,建议用bulk_insert。
  • 确定数据都为新增数据时建议使用insert,当存在更新数据时建议使用upsert。

例:bulk_insert写COW无分区表

df.write.format("org.apache.hudi").
option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
option("hoodie.datasource.write.precombine.field", "update_time").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "").
option("hoodie.datasource.write.operation", "bulk_insert").
option("hoodie.table.name", tableName).
option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.partition_fields", "").
option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
option("hoodie.datasource.hive_sync.database", databaseName).
option("hoodie.datasource.hive_sync.table", tableName).
option("hoodie.datasource.hive_sync.use_jdbc", "false").
option("hoodie.bulkinsert.shuffle.parallelism", 4).
mode(SaveMode.Overwrite).
save(basePath)

设置分区

  • 多级分区

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为多个业务字段,用逗号分隔。

    hoodie.datasource.hive_sync.partition_fields

    和hoodie.datasource.write.partitionpath.field的分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    配置为org.apache.hudi.keygen.ComplexKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。

    例:创建分区为p1/p2/p3的多级分区COW表

    df.write.format("org.apache.hudi").
    option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "update_time").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "year,month,day").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "year,month,day").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.database", databaseName).
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    mode(SaveMode.Overwrite).
    save(basePath)
  • 单分区

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为一个业务字段。

    hoodie.datasource.hive_sync.partition_fields

    和hoodie.datasource.write.partitionpath.field分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    默认可以配置为org.apache.hudi.keygen.SimpleKeyGenerator 和org.apache.hudi.keygen.ComplexKeyGenerator

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。

    例:创建分区为p1的单分区的MOR表

    df.write.format("org.apache.hudi").
    option("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "update_time").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "create_time").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "create_time").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
    option("hoodie.datasource.hive_sync.database", databaseName).
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    mode(SaveMode.Overwrite).
    save(basePath)
  • 无分区

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为空字符串。

    hoodie.datasource.hive_sync.partition_fields

    配置为空字符串。

    hoodie.datasource.write.keygenerator.class

    配置为org.apache.hudi.keygen.NonpartitionedKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置为org.apache.hudi.hive.NonPartitionedExtractor。

    例:创建无分区COW表

    df.write.format("org.apache.hudi").
    option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).
    option("hoodie.datasource.write.precombine.field", "update_time").
    option("hoodie.datasource.write.recordkey.field", "id").
    option("hoodie.datasource.write.partitionpath.field", "").
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.table.name", tableName).
    option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
    option("hoodie.datasource.hive_sync.enable", "true").
    option("hoodie.datasource.hive_sync.partition_fields", "").
    option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
    option("hoodie.datasource.hive_sync.database", databaseName).
    option("hoodie.datasource.hive_sync.table", tableName).
    option("hoodie.datasource.hive_sync.use_jdbc", "false").
    mode(SaveMode.Overwrite).
    save(basePath)
  • 时间日期分区

    配置项

    说明

    hoodie.datasource.write.partitionpath.field

    配置为date类型字段,格式为yyyy/mm/dd。

    hoodie.datasource.hive_sync.partition_fields

    和hoodie.datasource.write.partitionpath.field分区字段保持一致。

    hoodie.datasource.write.keygenerator.class

    默认配置为org.apache.hudi.keygen.SimpleKeyGenerator ,也可以配置为org.apache.hudi.keygen.ComplexKeyGenerator。

    hoodie.datasource.hive_sync.partition_extractor_class

    配置org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor。

    SlashEncodedDayPartitionValueExtractor存在以下约束:要求写入的日期格式为yyyy/mm/dd。

相关文档