更新时间:2024-12-25 GMT+08:00
分享

在DLI使用Hudi提交Spark Jar作业

提交Spark jar作业的场景需要手动配置由LakeFormation提供元数据服务的Hudi锁实现类,请参照 Hudi锁配置说明

  1. 登录DLI管理控制台,选择“作业管理 > Spark作业”,进入到Spark作业的界面。

    提交Hudi相关的Spark jar作业需要选择Spark版本为3.3.1,且使用的通用队列需要支持Hudi。

  2. 单击右上角的”创建作业”即可提交Spark jar的作业。
  3. 编写并打包Spark jar的程序包:(以Maven项目为例)

    创建或使用现有的maven java项目,在 pom.xml 中引入scala 2.12,spark 3.3.1 和hudi 0.11.0 版本的依赖。由于DLI环境已提供所需依赖,因此scope可以配置为provided。

    <dependencies>
      <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.12.15</version>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.1</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.3.1</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark3-bundle_2.12</artifactId>
        <version>0.11.0</version>
        <scope>provided</scope>
      </dependency>
      <!-- ... -->
    </dependencies>

    配置 scala-maven-plugin,用于编译和打包。

    <build>
      <plugins>
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.3.1</version>
          <executions>
            <execution>
              <goals>
                <goal>compile</goal>
                <goal>testCompile</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
        <!-- ... -->
      </plugins>
      <!-- ... -->
    </build>

    随后在main目录下创建scala目录,并新建一个包,随后在包目录下新建一个scala文件,在里面写入:

    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
    
    import java.util.{ArrayList, List => JList}
    
    object HudiScalaDemo {
      def main(args: Array[String]): Unit = {
        // 步骤1:获取/创建SparkSession实例
        val spark = SparkSession.builder
          .enableHiveSupport
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
          .appName("spark_jar_hudi_demo")
          .getOrCreate
    
        // 步骤2:构造写入用的DataFrame数据
        val schema = StructType(Array(
          StructField("id", DataTypes.IntegerType),
          StructField("name", DataTypes.StringType),
          StructField("update_time", DataTypes.StringType),
          StructField("create_time", DataTypes.StringType)
        ))
        val data: JList[Row] = new ArrayList[Row]()
        data.add(new GenericRowWithSchema(Array(1, "Alice", "2024-08-05 09:00:00", "2024-08-01"), schema))
        data.add(new GenericRowWithSchema(Array(2, "Bob", "2024-08-05 09:00:00", "2024-08-02"), schema))
        data.add(new GenericRowWithSchema(Array(3, "Charlie", "2024-08-05 09:00:00", "2024-08-03"), schema))
        val df = spark.createDataFrame(data, schema)
    
        // 步骤3:配置写入的表名和OBS路径
        val databaseName = "default"
        val tableName = "hudi_table"
        val basePath = "obs://bucket/path/hudi_table"
    
        // 步骤4:执行写入,同时会同步DLI元数据服务建表
        df.write.format("hudi")
          .option("hoodie.table.name", tableName)
          .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
          .option("hoodie.datasource.write.recordkey.field", schema.fields(0).name)  // 主键,必须配置
          .option("hoodie.datasource.write.precombine.field", schema.fields(2).name)  // 预聚合键,必须配置,如果不需要可以配置和主键相同的列
          .option("hoodie.datasource.write.partitionpath.field", schema.fields(3).name)  // 分区列,可以配置多个分区,使用英语逗号分隔
          .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
          // 使用DLI元数据服务时,需要同步配置使用对应的Hudi锁
          .option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider")
          // 开启同步配置
          .option("hoodie.datasource.hive_sync.enable", "true")
          .option("hoodie.datasource.hive_sync.partition_fields", schema.fields(3).name)
          // 根据实际分区字段情况配置,非分区表请选择 org.apache.hudi.hive.NonPartitionedExtractor
          .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor")
          .option("hoodie.datasource.hive_sync.use_jdbc", "false")
          .option("hoodie.datasource.hive_sync.table", tableName)
          .option("hoodie.datasource.hive_sync.database", databaseName)
          .mode(SaveMode.Overwrite)
          .save(basePath)
    
        // 步骤5:使用SQL的方式查询刚才写入的表
        spark.sql(s"select id,name,update_time,create_time from ${databaseName}.${tableName} where create_time='2024-08-01'")
          .show(100)
      }
    }
    随后执行maven打包命令,从target目录获取打包的jar文件并上传至OBS目录中。
    mvn clean install

  4. 提交Spark jar作业:

    进入DLI界面,在左侧菜单点击”作业管理”->”Spark作业”,随后在右侧界面的右上角点击”创建作业”。

    • 首先配置队列,Spark版本选择 3.3.1 及之后的版本。
    • 可以选择配置作业名称,便于识别和筛选。
    • 配置”应用程序”,路径指向上一步上传至OBS的Spark jar包。
    • 配置委托。选择提交DLI作业所需的委托。自定义委托请参考创建DLI自定义委托
    • 配置”主类(--class)”,为上一步中所写的,包含需要执行的main函数的类的全名。
    • 在”Spark参数(--conf)“处也可以配置Hudi参数,但是需要额外添加前缀“spark.hadoop.“,例如:
      spark.hadoop.hoodie.write.lock.provider=com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider
    • 配置”访问元数据”为”是”,推荐使用元数据服务管理Hudi表,上一步中写入配置包含了同步元数据的配置项。

    最后点击右上角的”执行”按钮即可提交作业。

  5. 执行作业,检查日志:(注意:日志归档耗时较长,在作业执行完成后,日志可能需要等待1-5分钟才能归档。)

    点击执行后会跳转到”Spark作业”界面,此处可以看到作业的执行状态。点击对应作业右侧的更多,可以在下拉菜单中跳转日志选单:

    • 归档日志:跳转OBS界面,可以看到该作业的全部日志归档地址,包含提交日志,Driver日志和Executor日志,在此处可以下载日志。
    • 提交日志:跳转到提交日志的聚合展示界面,可以查看任务提交中的日志信息。
    • Driver日志:跳转到Driver日志的聚合展示界面,从上至下依次展示 spark.log, stderr.log以及stdout.log。

    随后进入Driver日志,如果日志还未聚合,请等待几分钟后再次检查。可以在日志底部的stdout.log中查看到示例程序最后打印的select语句的结果。

相关文档