使用Spark Shell创建Hudi表
操作场景
本章节主要介绍了如何通过spark-shell使用Hudi功能。
使用Spark数据源,通过代码段展示如何插入和更新Hudi的默认存储类型数据集COW表,以及每次写操作之后如何读取快照和增量数据。
前提条件
如果集群已开启Kerberos认证,需在Manager界面创建1个人机用户并关联到hadoop和hive用户组,主组为hadoop。
约束与限制
本章节仅适用于MRS 3.3.1-LTS及之前版本。
操作步骤
- 下载并安装Hudi客户端,具体请参考安装MRS客户端章节。
目前Hudi集成在Spark/Spark2x服务中,用户从Manager页面下载Spark/Spark2x客户端即可,例如客户端安装目录为:“/opt/client”。
- 使用客户端安装用户登录客户端节点,执行如下命令:
进入客户端目录:
cd /opt/hadoopclient执行以下命令加载环境变量:
source bigdata_env
source Hudi/component_env
安全认证:
kinit 创建的业务用户
- 新创建的用户首次认证需要修改密码。
- 普通模式(未开启kerberos认证)集群无需执行kinit命令。
- 执行spark-shell --master yarn-client命令进入spark-shell,然后引入Hudi相关软件包并生成测试数据。
- 引入需要的包。
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._
- 定义表名,存储路径,生成测试数据。
val tableName = "hudi_cow_table" val basePath = "hdfs://hacluster/tmp/hudi_cow_table" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- 引入需要的包。
- 执行以下命令写入Hudi表,模式为OVERWRITE。
df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) - 执行以下命令注册临时表并查询。
val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() - 执行以下命令生成更新数据并更新Hudi表,模式为APPEND。
val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) - 查询Hudi表增量数据。
- 重新加载:
spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table") - 进行增量查询:
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) val incViewDF = spark.read.format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath); incViewDF.registerTempTable("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
- 重新加载:
- 进行指定时间点提交的查询。
val beginTime = "000" val endTime = commits(commits.length - 2) val incViewDF = spark.read.format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath); incViewDF.registerTempTable("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() - 删除测试数据。
- 准备删除的数据。
val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2") val deletes = dataGen.generateDeletes(df.collectAsList()) - 执行删除操作。
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath); - 重新查询数据。
val roViewDFAfterDelete = spark.read.format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table") spark.sql("select uuid, partitionPath from hudi_ro_table").show()
- 准备删除的数据。