更新时间:2024-08-05 GMT+08:00

如何采用Java命令提交Spark应用

问题

除了spark-submit命令提交应用外,如何采用Java命令提交Spark应用?

回答

您可以通过org.apache.spark.launcher.SparkLauncher类采用java命令方式提交Spark应用。详细步骤如下:

  1. 定义org.apache.spark.launcher.SparkLauncher类。默认提供了SparkLauncherJavaExample和SparkLauncherScalaExample示例,您需要根据实际业务应用程序修改示例代码中的传入参数。

    • 如果您使用Java语言开发程序,您可以参考如下示例,编写SparkLauncher类。
          public static void main(String[] args) throws Exception {
              System.out.println("com.huawei.bigdata.spark.examples.SparkLauncherExample <mode> <jarParh> <app_main_class> <appArgs>");
              SparkLauncher launcher = new SparkLauncher();
              launcher.setMaster(args[0])
                  .setAppResource(args[1]) // Specify user app jar path
                  .setMainClass(args[2]);
              if (args.length > 3) {
                  String[] list = new String[args.length - 3];
                  for (int i = 3; i < args.length; i++) {
                      list[i-3] = args[i];
                  }
                  // Set app args
                  launcher.addAppArgs(list);
              }
      
              // Launch the app
              Process process = launcher.launch();
              // Get Spark driver log
              new Thread(new ISRRunnable(process.getErrorStream())).start();
              int exitCode = process.waitFor();
              System.out.println("Finished! Exit code is "  + exitCode);
          }
    • 如果您使用Scala语言开发程序,您可以参考如下示例,编写SparkLauncher类。
        def main(args: Array[String]) {
          println(s"com.huawei.bigdata.spark.examples.SparkLauncherExample <mode> <jarParh>  <app_main_class> <appArgs>")
          val launcher = new SparkLauncher()
          launcher.setMaster(args(0))
            .setAppResource(args(1)) // Specify user app jar path
            .setMainClass(args(2))
            if (args.drop(3).length > 0) {
              // Set app args
              launcher.addAppArgs(args.drop(3): _*)
            }
      
      
          // Launch the app
          val process = launcher.launch()
          // Get Spark driver log
          new Thread(new ISRRunnable(process.getErrorStream)).start()
          val exitCode = process.waitFor()
          println(s"Finished! Exit code is $exitCode")
        }

  2. 根据业务逻辑,开发对应的Spark应用程序。并设置用户编写的Spark应用程序的主类等常数。不同场景的示例请参考开发Spark应用

    • 如果您使用的安全模式,建议按照安全要求,准备安全认证代码、业务应用代码及其相关配置。

      yarn-cluster模式中不支持在Spark工程中添加安全认证。因为需要在应用启动前已完成安全认证。所以用户需要在Spark应用之外添加安全认证代码或使用命令行进行认证。由于提供的示例代码默认提供安全认证代码,请在yarn-cluster模式下时,修改对应安全代码后再运行应用。

    • 如果您使用的是普通模式,准备业务应用代码及其相关配置即可。

  3. 调用org.apache.spark.launcher.SparkLauncher.launch()方法,将用户的应用程序提交。

    1. 将SparkLauncher程序和用户应用程序分别生成Jar包,并上传至运行此应用的Spark节点中。生成Jar包的操作步骤请参见在Linux环境中编包并运行Spark程序章节。
      • SparkLauncher程序的编译依赖包为spark-launcher_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar,请从软件发布包中Software文件夹下“FusionInsight_Spark2x_8.1.0.1.tar.gz”压缩包中的“jars”目录中获取。
      • 用户应用程序的编译依赖包根据代码不同而不同,需用户根据自己编写的代码进行加载。
    2. 将运行程序的依赖Jar包上传至需要运行此应用的节点中,例如“$SPARK_HOME/jars”路径。

      用户需要将SparkLauncher类的运行依赖包和应用程序运行依赖包上传至客户端的jars路径。文档中提供的示例代码,其运行依赖包在客户端jars中已存在。

      Spark Launcher的方式依赖Spark客户端,即运行程序的节点必须已安装Spark客户端,且客户端可用。运行过程中依赖客户端已配置好的环境变量、运行依赖包和配置文件,

    3. 在Spark应用程序运行节点,执行如下命令使用Spark Launcher方式提交。之后,可通过Spark WebUI查看运行情况,或通过获取指定文件查看运行结果,可参见在Linux环境中查看Spark程序调测结果

      java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:SparkLauncherExample.jar com.huawei.bigdata.spark.examples.SparkLauncherExample yarn-client /opt/female/FemaleInfoCollection.jar com.huawei.bigdata.spark.examples.FemaleInfoCollection <inputPath>