使用SDK提交Spark作业
本节操作介绍使用DLI SDK V2提交Spark作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。
对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
private static final Logger logger = LoggerFactory.getLogger(SparkJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // 步骤一:提交Spark作业到DLI执行。 String jobId = runSparkJob(dliClient, "YourQueueName"); // 步骤二:如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。 checkRunning(dliClient, jobId); // 步骤三:如果您想根据条件查询一个或多个特定作业,可执行以下方法。 // 此处仅作样例,除了jobId,您还可指定其它筛选条件。详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表2 listSparkJob(dliClient, jobId); /* * 其它场景: * 1. 作业运行期间,如果您想取消作业,可调用接口取消批处理作业。具体操作请参考取消批处理作业。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#cancelSparkJob(CancelSparkJobRequest), * 注:作业状态为“已成功”或者“已失败”的批处理作业无法取消。 * 2. 如果您想根据jobId查询某个特定作业的详情,可执行以下方法。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJob(ShowSparkJobRequest), * 详见ShowSparkJob */ } catch (DLIException e) { // 请根据业务实际情况处理异常信息,此处仅作样例。 } } |
创建Spark作业
- 功能介绍:
用于执行Spark作业。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createSparkJob(CreateSparkJobRequest)
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
private static String runSparkJob(DliClient client, String queueName) { // 请根据业务实际情况设置相应的参数,此处仅作样例。 Map<String, Object> confMap = new HashMap<>(); confMap.put("SparkConfKey", "SparkConfValue"); CreateSparkJobResponse resp = client.createSparkJob(new CreateSparkJobRequest() .withBody(new CreateSparkJobRequestBody() .withQueue(queueName) .withSparkVersion("2.4.5") .withName("demo_spark_app") .withFile("obs://your_bucket/your_spark_app.jar") // 必选 .withClassName("YourClassFullName") // 必选 .withArgs(Arrays.asList("YourAppArg1", "YourAppAgr2", "...")) .withConf(confMap) .withJars(Arrays.asList("YourDepJar1", "YourDepJar2", "...")) .withDriverCores(2) .withDriverMemory("8GB") .withNumExecutors(3) .withExecutorCores(4) .withExecutorMemory("16GB"))); return resp.getId(); }
查询批处理作业状态
- 功能介绍:
用于执行Spark作业。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSparkJobStatus(ShowSparkJobStatusRequest)
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private static void checkRunning(DliClient client, String jobId) throws DLIException { while (true) { ShowSparkJobStatusResponse resp; try { resp = client.showSparkJobStatus(new ShowSparkJobStatusRequest().withBatchId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get job status by id: " + jobId, e); } String status = resp.getState(); logger.info(String.format("SparkJob id %s status: %s", jobId, status)); if ("success".equals(status)) { return; } if ("dead".equals(status)) { throw new DLIException("Run job failed"); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
查询批处理作业列表
- 功能介绍:
查询批处理作业列表。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#listSparkJobs(ListSparkJobsRequest)
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private static void listSparkJob(DliClient client, String jobId) throws DLIException { ListSparkJobsResponse resp; try { resp = client.listSparkJobs(new ListSparkJobsRequest() // 您还可用.withXxx()方法指定其它条件来返回满足条件的SparkJobs,此处仅做样例。 // 详见 https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表2 .withJobId(jobId) .withQueueName("YourQueueName") .withStart(1234567L) // 可以指定作业开始时间 .withEnd(2345678L)); // 可以指定作业结束时间 } catch (Exception e) { throw new DLIException("Failed to list Spark jobs: ", e); } logger.info(String.format("List SparkJobs : %s", resp.toString())); // resp中的响应参数详见:https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs 表3和表4 }