Submitting a Spark Job Using an SDK
This section describes how to submit a Spark job using DLI SDK V2.
 
 
  Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.
For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.
Prerequisites
- You have configured the Java SDK environment by referring to Overview.
- You have initialized the DLI client by referring to Initializing the DLI Client.
Preparations
Obtain an AK/SK, project ID, and region information.
- Log in to the management console.
- In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
- In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
- On the displayed page, click Download. Open the file to obtain the AK/SK information.
- In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.
Example Code
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | private static final Logger logger = LoggerFactory.getLogger(SparkJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // Step 1: Submit a Spark job to DLI for execution. String jobId = runSparkJob(dliClient, "YourQueueName"); // Step 2: If you wish to wait for the job execution to finish within the current thread, cycle through checking the status until the job completes. checkRunning(dliClient, jobId); // Step 3: To query one or more specific jobs based on conditions, use the following method: // This is just an example. In addition to jobId, you can also specify other filter criteria. For details, see Table 2 in https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs. listSparkJob(dliClient, jobId); /* * Other scenarios: * 1. During job execution, if you wish to cancel the job, you can call the API to cancel the batch processing job. * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#cancelSparkJob(CancelSparkJobRequest), * Note: Batch processing jobs in the Successful or Failed state cannot be canceled. * 2. To query details about a specific job based on the job ID, perform the following operations: * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showSparkJob(ShowSparkJobRequest), */ } catch (DLIException e) { // Handle the exception based on service requirements. The following is just an example. } } | 
Creating a Spark Job
- Function:
    Execute Spark jobs. 
- Reference link:
    Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#createSparkJob(CreateSparkJobRequest) 
- Sample code:
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static String runSparkJob(DliClient client, String queueName) { // Set the parameters according to the actual situation. The following is just an example. Map<String, Object> confMap = new HashMap<>(); confMap.put("SparkConfKey", "SparkConfValue"); CreateSparkJobResponse resp = client.createSparkJob(new CreateSparkJobRequest() .withBody(new CreateSparkJobRequestBody() .withQueue(queueName) .withSparkVersion("2.4.5") .withName("demo_spark_app") .withFile("obs://your_bucket/your_spark_app.jar") // Mandatory .withClassName("YourClassFullName") // Mandatory .withArgs(Arrays.asList("YourAppArg1", "YourAppAgr2", "...")) .withConf(confMap) .withJars(Arrays.asList("YourDepJar1", "YourDepJar2", "...")) .withDriverCores(2) .withDriverMemory("8GB") .withNumExecutors(3) .withExecutorCores(4) .withExecutorMemory("16GB"))); return resp.getId(); } 
Querying the Status of a Batch Processing Job
- Function:
    Execute Spark jobs. 
- Reference link:
    Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showSparkJobStatus(ShowSparkJobStatusRequest) 
- Sample code:
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static void checkRunning(DliClient client, String jobId) throws DLIException { while (true) { ShowSparkJobStatusResponse resp; try { resp = client.showSparkJobStatus(new ShowSparkJobStatusRequest().withBatchId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get job status by id: " + jobId, e); } String status = resp.getState(); logger.info(String.format("SparkJob id %s status: %s", jobId, status)); if ("success".equals(status)) { return; } if ("dead".equals(status)) { throw new DLIException("Run job failed"); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } } 
Querying the Batch Processing Job List
- Function:
    Query the batch processing job list. 
- Reference link:
    Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#listSparkJobs(ListSparkJobsRequest) 
- Sample code:
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void listSparkJob(DliClient client, String jobId) throws DLIException { ListSparkJobsResponse resp; try { resp = client.listSparkJobs(new ListSparkJobsRequest() // You can also use the .withXxx() method to specify other criteria to return Spark jobs that meet the criteria. This is just an example. // See Table 2 in https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs. .withJobId(jobId) .withQueueName("YourQueueName") .withStart(1234567L) // You can specify the job start time. .withEnd(2345678L)); // You can specify the job end time. } catch (Exception e) { throw new DLIException("Failed to list Spark jobs: ", e); } logger.info(String.format("List SparkJobs : %s", resp.toString())); // For details about the response parameters in resp, see Table 3 and Table 4 in https://console.huaweicloud.com/apiexplorer/#/openapi/DLI/doc?api=ListSparkJobs. } 
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot 
    