使用Spark Shell创建Hudi表
操作场景
本章节主要介绍了如何通过spark-shell使用Hudi功能。
使用Spark数据源,通过代码段展示如何插入和更新Hudi的默认存储类型数据集COW表,以及每次写操作之后如何读取快照和增量数据。
前提条件
- 已下载并安装Hudi客户端,目前Hudi集成在MRS集群的Spark/Spark2x服务中,用户从Manager页面下载包含Spark/Spark2x服务的客户端即可,例如客户端安装目录为“/opt/hadoopclient”。
- 如果集群已开启Kerberos认证,已在Manager界面创建1个人机用户并关联到hadoop和hive用户组,主组为hadoop。
操作步骤
- 下载并安装Hudi客户端,具体请参考安装MRS客户端章节。
- 使用客户端安装用户登录客户端节点,执行如下命令进入客户端目录。
cd /opt/hadoopclient
- 执行以下命令加载环境变量。
source bigdata_env
source Hudi/component_env
kinit 创建的业务用户
- 新创建的用户首次认证需要修改密码。
- 普通模式(未开启kerberos认证)集群无需执行kinit命令。
- 执行spark-shell --master yarn-client命令进入spark-shell,然后引入Hudi相关软件包并生成测试数据。
- 引入需要的包。
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
- 定义表名,存储路径,生成测试数据。
val tableName = "hudi_cow_table"
val basePath = "hdfs://hacluster/tmp/hudi_cow_table"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- 引入需要的包。
- 执行以下命令写入Hudi表,模式为OVERWRITE。
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
- 执行以下命令注册临时表并查询。
val roViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
- 执行以下命令生成更新数据并更新Hudi表,模式为APPEND。
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
- 查询Hudi表增量数据。
- 重新加载:
spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")
- 进行增量查询:
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
val incViewDF = spark.read.format("org.apache.hudi").
option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath);
incViewDF.registerTempTable("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
- 重新加载:
- 进行指定时间点提交的查询。
val beginTime = "000"
val endTime = commits(commits.length - 2)
val incViewDF = spark.read.format("org.apache.hudi").
option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath);
incViewDF.registerTempTable("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
- 删除测试数据。
- 准备删除的数据。
val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")
val deletes = dataGen.generateDeletes(df.collectAsList())
- 执行删除操作。
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
- 重新查询数据。
val roViewDFAfterDelete = spark.read.format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
spark.sql("select uuid, partitionPath from hudi_ro_table").show()
- 准备删除的数据。