更新时间:2024-03-06 GMT+08:00

Spark作业相关

前提条件

提交批处理作业

DLI提供执行批处理作业的接口。您可以使用该接口执行批处理作业。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void runBatchJob(Cluster cluster) throws DLIException {
    SparkJobInfo jobInfo = new SparkJobInfo();
    jobInfo.setClassName("your.class.name");
    jobInfo.setFile("xxx.jar");
    jobInfo.setCluster_name("queueName");
    // 调用BatchJob对象的asyncSubmit接口提交批处理作业
    BatchJob job = new BatchJob(cluster, jobInfo);
    job.asyncSubmit();
    while (true) {
        SparkJobStatus jobStatus = job.getStatus();
        if (SparkJobStatus.SUCCESS.equals(jobStatus)) {
            System.out.println("Job finished");
            return;
        }
        if (SparkJobStatus.DEAD.equals(jobStatus)) {
            throw new DLIException("The batch has already exited");
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • Cluster为用户自建的队列。
  • 传参不能为JSON格式。
  • 对应批处理作业提交提供两个接口:
    • 异步 asyncSubmit,提交后直接返回,不等待
    • 同步 submit,提交后会一直等待作业执行结束

删除批处理作业

DLI提供删除批处理作业的接口。您可以使用该接口删除批处理作业。示例代码如下:

1
2
3
4
5
6
7
private static void deleteBatchJob(DLIClient client) throws DLIException {
    //提交Spark批处理运行作业的Id
    String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6";
    // 调用BatchJob对象的delBatch接口取消批处理作业
    MessageInfo messageInfo = client.delBatchJob(batchId);
    System.out.println(messageInfo.getMsg());
}

查询所有批处理作业

DLI提供查询批处理作业的接口。您可以使用该接口查询当前工程下的所有批处理作业信息。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private static void listAllBatchJobs(DLIClient client) throws DLIException {
  System.out.println("list all batch jobs...");
  // 通过调用DLIClient对象的listAllBatchJobs方法查询批处理作业
  String queueName = "queueName";
  int from = 0;
  int size = 1000;
  // 分页,起始页,每页大小
  List<SparkJobResultInfo> jobResults = client.listAllBatchJobs(queueName, from, size);
  for (SparkJobResultInfo jobResult : jobResults) {
    // job id
    System.out.println(jobResult.getId());
    // job app id
    System.out.println(jobResult.getAppId());
    // job状态
    System.out.println(jobResult.getState());
  }
}

查询批处理作业状态

DLI提供查询批处理作业状态的接口。您可以使用该接口查询批处理作业当前的状态信息。示例代码如下:
private static void getStateBatchJob(DLIClient client) throws DLIException  {
        BatchJob batchJob = null;
        SparkJobInfo jobInfo = new SparkJobInfo();
        jobInfo.setClusterName("queueName");
        jobInfo.setFile("xxx.jar");
        jobInfo.setClassName("your.class.name");
        batchJob = new BatchJob(client.getCluster("queueName"), jobInfo);
        batchJob.asyncSubmit();
        SparkJobStatus sparkJobStatus=batchJob.getStatus();
        System.out.println(sparkJobStatus);
	
}

查询批处理作业日志

DLI提供查询批处理作业日志的接口。您可以使用该接口查询批处理作业的日志信息。示例代码如下:
private static void getBatchJobLog(DLIClient client) throws DLIException  {
        BatchJob batchJob = null;
        SparkJobInfo jobInfo = new SparkJobInfo();
        jobInfo.setClusterName("queueName");
        jobInfo.setFile("xxx.jar");
        jobInfo.setClassName("your.class.name");
        batchJob = new BatchJob(client.getCluster("queueName"), jobInfo);
        batchJob.submit();
        // 调用BatchJob对象的getLog接口查询批处理作业日志
        int from = 0;
        int size = 1000;
        List<String> jobLogs = batchJob.getLog(from,size);
        System.out.println(jobLogs);

}