更新时间:2022-12-14 GMT+08:00
原地转表
Hudi提供了方便的原地转表功能,可以根据历史表生成Hudi自有的.hoodie管理文件,从而将历史表转换成Hudi表而不改变历史表。
示例:需要将HDFS上目录为“hdfs://hacluster/user/hive/warehouse/pq1”的Hive表转换为Hudi表,存放在“hdfs://hacluster/tmp/hudi_bootstrap_test”。
- 原地转表不支持和写并发,仅用于创建新Hudi表,不允许对存量Hudi表执行。
- 原地转表只支持写COW表。
spark-shell import collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import java.time._ import java.util.Collections val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val numRecords: Int = 100 val srcPath = "hdfs://hacluster/user/hive/warehouse/pq1" val basePath = "hdfs://hacluster/tmp/hudi_bootstrap_test" // hudi配置信息 val commonOpts: Map[String, String] = Map( HoodieWriteConfig.INSERT_PARALLELISM -> "4", HoodieWriteConfig.UPSERT_PARALLELISM -> "4", HoodieWriteConfig.DELETE_PARALLELISM -> "4", HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4", HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4", HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "col1", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", HoodieWriteConfig.TABLE_NAME -> "hoodie_test" ) // 原地转表 val bootstrapDF = spark.emptyDataFrame bootstrapDF.write. format("hudi"). options(commonOpts). option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL). option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath). option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName). mode(SaveMode.Overwrite). save(basePath) // 转表后查询 var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") hoodieROViewDF1.show
父主题: 写操作指导