更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Spark作业

本节操作介绍使用DLI SDK V2提交Spark作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   private static final Logger logger = LoggerFactory.getLogger(SparkJobExample.class);

    public static void main(String[] args) {
        String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK");
        String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK");
        DliClient dliClient = DliClient.newBuilder()
            .withRegion(DliRegion.valueOf("RegionName"))
            .withCredential(new BasicCredentials()
                .withAk(yourAccessKey)
                .withSk(yourSecretKey)
                .withProjectId("YouProjectId"))
            .build();

        try {
            // 步骤一:提交Spark作业到DLI执行
            String jobId = runSparkJob(dliClient, "YourQueueName");
            // 步骤二:如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。
            checkRunning(dliClient, jobId);
            // 步骤三:如果您想根据条件查询一个或多个特定作业,可执行以下方法。
            // 此处仅作样例,除了jobId,您还可指定其它筛选条件。详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 2
            listSparkJob(dliClient, jobId);

            /*
             * 其它场景:
             * 1. 作业运行期间,如果您想取消作业,可调用接口取消批处理作业。具体操作请参考取消批处理作业。
             * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#cancelSparkJob(CancelSparkJobRequest),
             * 注:作业状态为“已成功”或者“已失败”的批处理作业无法取消。
             * 2. 如果您想根据jobId查询某个特定作业的详情,可执行以下方法。
             * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJob(ShowSparkJobRequest),
             * 详见ShowSparkJob
             */
        } catch (DLIException e) {
            // 请根据业务实际情况处理异常信息,此处仅作样例。
        }
    }

创建Spark作业

  • 功能介绍:

    用于执行Spark作业。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createSparkJob(CreateSparkJobRequest)

    创建批处理作业

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
        private static String runSparkJob(DliClient client, String queueName) {
            // 请根据业务实际情况设置相应的参数,此处仅作样例。
            Map<String, Object> confMap = new HashMap<>();
            confMap.put("SparkConfKey", "SparkConfValue");
            CreateSparkJobResponse resp = client.createSparkJob(new CreateSparkJobRequest()
                .withBody(new CreateSparkJobRequestBody()
                    .withQueue(queueName)
                    .withSparkVersion("2.4.5")
                    .withName("demo_spark_app")
                    .withFile("obs://your_bucket/your_spark_app.jar") // 必选
                    .withClassName("YourClassFullName") // 必选
                    .withArgs(Arrays.asList("YourAppArg1", "YourAppAgr2", "..."))
                    .withConf(confMap)
                    .withJars(Arrays.asList("YourDepJar1", "YourDepJar2", "..."))
                    .withDriverCores(2)
                    .withDriverMemory("8GB")
                    .withNumExecutors(3)
                    .withExecutorCores(4)
                    .withExecutorMemory("16GB")));
            return resp.getId();
        }
    

查询批处理作业状态

  • 功能介绍:

    用于执行Spark作业。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJobStatus(ShowSparkJobStatusRequest)

    查询批处理作业状态

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
        private static void checkRunning(DliClient client, String jobId) throws DLIException {
            while (true) {
                ShowSparkJobStatusResponse resp;
                try {
                    resp = client.showSparkJobStatus(new ShowSparkJobStatusRequest().withBatchId(jobId));
                } catch (Exception e) {
                    throw new DLIException("Failed to get job status by id: " + jobId, e);
                }
                String status = resp.getState();
                logger.info(String.format("SparkJob id %s status: %s", jobId, status));
    
                if ("success".equals(status)) {
                    return;
                }
                if ("dead".equals(status)) {
                    throw new DLIException("Run job failed");
                }
    
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new DLIException("Check job running interrupted.");
                }
            }
        }
    

查询批处理作业列表

  • 功能介绍:

    查询批处理作业列表。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#listSparkJobs(ListSparkJobsRequest)

    查询批处理作业列表

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
        private static void listSparkJob(DliClient client, String jobId) throws DLIException {
            ListSparkJobsResponse resp;
            try {
                resp = client.listSparkJobs(new ListSparkJobsRequest()
                    // 您还可用.withXxx()方法指定其它条件来返回满足条件的SparkJobs,此处仅做样例。
                    // 详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 2
                    .withJobId(jobId)
                    .withQueueName("YourQueueName")
                    .withStart(1234567L) // 可以指定作业开始时间
                    .withEnd(2345678L)); // 可以指定作业结束时间
            } catch (Exception e) {
                throw new DLIException("Failed to list Spark jobs: ", e);
            }
            logger.info(String.format("List SparkJobs : %s", resp.toString()));
            // resp中的响应参数详见https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 3和表4
        }
    

相关文档