Help Center/ Data Lake Insight/ SDK Reference/ Java SDK/ SDKs Related to Spark Jobs
Updated on 2023-07-19 GMT+08:00

SDKs Related to Spark Jobs

Prerequisites

  • You have configured the Java SDK environment by following the instructions provided Instructions.
  • You have initialized the DLI Client by following the instructions provided in Initializing the DLI Client and created queues by following the instructions provided in Queue-Related SDKs.

Submitting Batch Jobs

DLI provides an API to perform batch jobs. Sample code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void runBatchJob(Cluster cluster) throws DLIException {
    SparkJobInfo jobInfo = new SparkJobInfo();
    jobInfo.setClassName("your.class.name");
    jobInfo.setFile("xxx.jar");
    jobInfo.setCluster_name("queueName");
   // Call the asyncSubmit method on the BatchJob object to submit the batch job.
    BatchJob job = new BatchJob(cluster, jobInfo);
    job.asyncSubmit();
    while (true) {
        SparkJobStatus jobStatus = job.getStatus();
        if (SparkJobStatus.SUCCESS.equals(jobStatus)) {
            System.out.println("Job finished");
            return;
        }
        if (SparkJobStatus.DEAD.equals(jobStatus)) {
            throw new DLIException("The batch has already exited");
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • A cluster is a queue created by a user.
  • The input parameter cannot be in JSON format.
  • DLI provides the following two APIs related to batch jobs:
    • asyncSubmit: This API is asynchronous. After the API is submitted, the job result is directly returned.
    • submit: This API is synchronous. After the API is submitted, the result is returned only after job execution is complete.

Deleting Batch Jobs

DLI provides an API for deleting batch processing jobs. The example code is as follows:

1
2
3
4
5
6
7
private static void deleteBatchJob(DLIClient client) throws DLIException {
   // Submit the ID of the Spark batch processing job.
    String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6";
   // Call the delBatch method on the BatchJob object to cancel the batch job.
    MessageInfo messageInfo = client.delBatchJob(batchId);
    System.out.println(messageInfo.getMsg());
}

Querying All Batch Jobs

DLI provides an API for querying batch processing jobs. You can use it to query all batch jobs of the current project. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private static void listAllBatchJobs(DLIClient client) throws DLIException {
  System.out.println("list all batch jobs...");
  // Call the listAllBatchJobs method on the DLIClient object to query the batch jobs.
  String queueName = "queueName";
  int from = 0;
  int size = 1000;
  // Set paging, start page, and size of each page.
  List<SparkJobResultInfo> jobResults = client.listAllBatchJobs(queueName, from, size);
  for (SparkJobResultInfo jobResult : jobResults) {
    // Job ID
    System.out.println(jobResult.getId());
    // Job app ID
    System.out.println(jobResult.getAppId());
    // Job status
    System.out.println(jobResult.getState());
  }
}

Querying a Batch Job Status

DLI provides an API for querying status of batch processing jobs. The following sample code calls the API to query the job status:
private static void getStateBatchJob(DLIClient client) throws DLIException  {
        BatchJob batchJob = null;
        SparkJobInfo jobInfo = new SparkJobInfo();
        jobInfo.setClusterName("queueName");
        jobInfo.setFile("xxx.jar");
        jobInfo.setClassName("your.class.name");
        batchJob = new BatchJob(client.getCluster("queueName"), jobInfo);
        batchJob.asyncSubmit();
        SparkJobStatus sparkJobStatus=batchJob.getStatus();
        System.out.println(sparkJobStatus);
	
}

Querying Batch Job Logs

DLI provides an API for querying logs of batch processing jobs. The following sample code calls the API to query the job logs:
private static void getBatchJobLog(DLIClient client) throws DLIException  {
        BatchJob batchJob = null;
        SparkJobInfo jobInfo = new SparkJobInfo();
        jobInfo.setClusterName("queueName");
        jobInfo.setFile("xxx.jar");
        jobInfo.setClassName("your.class.name");
        batchJob = new BatchJob(client.getCluster("queueName"), jobInfo);
        batchJob.submit();
        // Call getLog on the BatchJob object to query the batch processing job.
        int from = 0;
        int size = 1000;
        List<String> jobLogs = batchJob.getLog(from,size);
        System.out.println(jobLogs);

}