在DLI使用Hudi提交Spark Jar作业
提交Spark jar作业的场景需要手动配置由LakeFormation提供元数据服务的Hudi锁实现类,请参照 Hudi锁配置说明。
- 登录DLI管理控制台,选择“作业管理 > Spark作业”,进入到Spark作业的界面。
提交Hudi相关的Spark jar作业需要选择Spark版本为3.3.1,且使用的通用队列需要支持Hudi。
- 单击右上角的”创建作业”即可提交Spark jar的作业。
- 编写并打包Spark jar的程序包:(以Maven项目为例)
创建或使用现有的maven java项目,在 pom.xml 中引入scala 2.12,spark 3.3.1 和hudi 0.11.0 版本的依赖。由于DLI环境已提供所需依赖,因此scope可以配置为provided。
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.15</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark3-bundle_2.12</artifactId> <version>0.11.0</version> <scope>provided</scope> </dependency> <!-- ... --> </dependencies>
配置 scala-maven-plugin,用于编译和打包。
<build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.1</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- ... --> </plugins> <!-- ... --> </build>
随后在main目录下创建scala目录,并新建一个包,随后在包目录下新建一个scala文件,在里面写入:
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import java.util.{ArrayList, List => JList} object HudiScalaDemo { def main(args: Array[String]): Unit = { // 步骤1:获取/创建SparkSession实例 val spark = SparkSession.builder .enableHiveSupport .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("spark_jar_hudi_demo") .getOrCreate // 步骤2:构造写入用的DataFrame数据 val schema = StructType(Array( StructField("id", DataTypes.IntegerType), StructField("name", DataTypes.StringType), StructField("update_time", DataTypes.StringType), StructField("create_time", DataTypes.StringType) )) val data: JList[Row] = new ArrayList[Row]() data.add(new GenericRowWithSchema(Array(1, "Alice", "2024-08-05 09:00:00", "2024-08-01"), schema)) data.add(new GenericRowWithSchema(Array(2, "Bob", "2024-08-05 09:00:00", "2024-08-02"), schema)) data.add(new GenericRowWithSchema(Array(3, "Charlie", "2024-08-05 09:00:00", "2024-08-03"), schema)) val df = spark.createDataFrame(data, schema) // 步骤3:配置写入的表名和OBS路径 val databaseName = "default" val tableName = "hudi_table" val basePath = "obs://bucket/path/hudi_table" // 步骤4:执行写入,同时会同步DLI元数据服务建表 df.write.format("hudi") .option("hoodie.table.name", tableName) .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.datasource.write.recordkey.field", schema.fields(0).name) // 主键,必须配置 .option("hoodie.datasource.write.precombine.field", schema.fields(2).name) // 预聚合键,必须配置,如果不需要可以配置和主键相同的列 .option("hoodie.datasource.write.partitionpath.field", schema.fields(3).name) // 分区列,可以配置多个分区,使用英语逗号分隔 .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator") // 使用DLI元数据服务时,需要同步配置使用对应的Hudi锁 .option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider") // 开启同步配置 .option("hoodie.datasource.hive_sync.enable", "true") .option("hoodie.datasource.hive_sync.partition_fields", schema.fields(3).name) // 根据实际分区字段情况配置,非分区表请选择 org.apache.hudi.hive.NonPartitionedExtractor .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor") .option("hoodie.datasource.hive_sync.use_jdbc", "false") .option("hoodie.datasource.hive_sync.table", tableName) .option("hoodie.datasource.hive_sync.database", databaseName) .mode(SaveMode.Overwrite) .save(basePath) // 步骤5:使用SQL的方式查询刚才写入的表 spark.sql(s"select id,name,update_time,create_time from ${databaseName}.${tableName} where create_time='2024-08-01'") .show(100) } }
随后执行maven打包命令,从target目录获取打包的jar文件并上传至OBS目录中。mvn clean install
- 提交Spark jar作业:
进入DLI界面,在左侧菜单点击”作业管理”->”Spark作业”,随后在右侧界面的右上角点击”创建作业”。
- 首先配置队列,Spark版本选择 3.3.1 及之后的版本。
- 可以选择配置作业名称,便于识别和筛选。
- 配置”应用程序”,路径指向上一步上传至OBS的Spark jar包。
- 配置委托。选择提交DLI作业所需的委托。自定义委托请参考创建DLI自定义委托 。
- 配置”主类(--class)”,为上一步中所写的,包含需要执行的main函数的类的全名。
- 在”Spark参数(--conf)“处也可以配置Hudi参数,但是需要额外添加前缀“spark.hadoop.“,例如:
spark.hadoop.hoodie.write.lock.provider=com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider
- 配置”访问元数据”为”是”,推荐使用元数据服务管理Hudi表,上一步中写入配置包含了同步元数据的配置项。
最后点击右上角的”执行”按钮即可提交作业。
- 执行作业,检查日志:(注意:日志归档耗时较长,在作业执行完成后,日志可能需要等待1-5分钟才能归档。)
点击执行后会跳转到”Spark作业”界面,此处可以看到作业的执行状态。点击对应作业右侧的更多,可以在下拉菜单中跳转日志选单:
- 归档日志:跳转OBS界面,可以看到该作业的全部日志归档地址,包含提交日志,Driver日志和Executor日志,在此处可以下载日志。
- 提交日志:跳转到提交日志的聚合展示界面,可以查看任务提交中的日志信息。
- Driver日志:跳转到Driver日志的聚合展示界面,从上至下依次展示 spark.log, stderr.log以及stdout.log。
随后进入Driver日志,如果日志还未聚合,请等待几分钟后再次检查。可以在日志底部的stdout.log中查看到示例程序最后打印的select语句的结果。