更新时间:2024-03-06 GMT+08:00
Spark作业相关
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化,参考队列相关完成队列创建等操作。
提交批处理作业
DLI提供执行批处理作业的接口。您可以使用该接口执行批处理作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
private static void runBatchJob(Cluster cluster) throws DLIException { SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClassName("your.class.name"); jobInfo.setFile("xxx.jar"); jobInfo.setCluster_name("queueName"); // 调用BatchJob对象的asyncSubmit接口提交批处理作业 BatchJob job = new BatchJob(cluster, jobInfo); job.asyncSubmit(); while (true) { SparkJobStatus jobStatus = job.getStatus(); if (SparkJobStatus.SUCCESS.equals(jobStatus)) { System.out.println("Job finished"); return; } if (SparkJobStatus.DEAD.equals(jobStatus)) { throw new DLIException("The batch has already exited"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } |
- Cluster为用户自建的队列。
- 传参不能为JSON格式。
- 对应批处理作业提交提供两个接口:
- 异步 asyncSubmit,提交后直接返回,不等待
- 同步 submit,提交后会一直等待作业执行结束
删除批处理作业
DLI提供删除批处理作业的接口。您可以使用该接口删除批处理作业。示例代码如下:
1 2 3 4 5 6 7 |
private static void deleteBatchJob(DLIClient client) throws DLIException { //提交Spark批处理运行作业的Id String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6"; // 调用BatchJob对象的delBatch接口取消批处理作业 MessageInfo messageInfo = client.delBatchJob(batchId); System.out.println(messageInfo.getMsg()); } |
查询所有批处理作业
DLI提供查询批处理作业的接口。您可以使用该接口查询当前工程下的所有批处理作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private static void listAllBatchJobs(DLIClient client) throws DLIException { System.out.println("list all batch jobs..."); // 通过调用DLIClient对象的listAllBatchJobs方法查询批处理作业 String queueName = "queueName"; int from = 0; int size = 1000; // 分页,起始页,每页大小 List<SparkJobResultInfo> jobResults = client.listAllBatchJobs(queueName, from, size); for (SparkJobResultInfo jobResult : jobResults) { // job id System.out.println(jobResult.getId()); // job app id System.out.println(jobResult.getAppId()); // job状态 System.out.println(jobResult.getState()); } } |
查询批处理作业状态
DLI提供查询批处理作业状态的接口。您可以使用该接口查询批处理作业当前的状态信息。示例代码如下:
private static void getStateBatchJob(DLIClient client) throws DLIException { BatchJob batchJob = null; SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClusterName("queueName"); jobInfo.setFile("xxx.jar"); jobInfo.setClassName("your.class.name"); batchJob = new BatchJob(client.getCluster("queueName"), jobInfo); batchJob.asyncSubmit(); SparkJobStatus sparkJobStatus=batchJob.getStatus(); System.out.println(sparkJobStatus); }
查询批处理作业日志
DLI提供查询批处理作业日志的接口。您可以使用该接口查询批处理作业的日志信息。示例代码如下:
private static void getBatchJobLog(DLIClient client) throws DLIException { BatchJob batchJob = null; SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClusterName("queueName"); jobInfo.setFile("xxx.jar"); jobInfo.setClassName("your.class.name"); batchJob = new BatchJob(client.getCluster("queueName"), jobInfo); batchJob.submit(); // 调用BatchJob对象的getLog接口查询批处理作业日志 int from = 0; int size = 1000; List<String> jobLogs = batchJob.getLog(from,size); System.out.println(jobLogs); }
父主题: Java SDK