更新时间:2022-12-14 GMT+08:00

原地转表

Hudi提供了方便的原地转表功能,可以根据历史表生成Hudi自有的.hoodie管理文件,从而将历史表转换成Hudi表而不改变历史表。

示例:需要将HDFS上目录为“hdfs://hacluster/user/hive/warehouse/pq1”的Hive表转换为Hudi表,存放在“hdfs://hacluster/tmp/hudi_bootstrap_test”。

  • 原地转表不支持和写并发,仅用于创建新Hudi表,不允许对存量Hudi表执行。
  • 原地转表只支持写COW表。
spark-shell

import collection.JavaConverters._
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import java.time._
import java.util.Collections

val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val numRecords: Int = 100
val srcPath = "hdfs://hacluster/user/hive/warehouse/pq1"
val basePath = "hdfs://hacluster/tmp/hudi_bootstrap_test"

// hudi配置信息
val commonOpts: Map[String, String] = Map(
    HoodieWriteConfig.INSERT_PARALLELISM -> "4",
    HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
    HoodieWriteConfig.DELETE_PARALLELISM -> "4",
    HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4",
    HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4",
    HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "col1",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
  )
// 原地转表
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write.
  format("hudi").
  options(commonOpts).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath).
  option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName).
  mode(SaveMode.Overwrite).
  save(basePath)

// 转表后查询
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
hoodieROViewDF1.show