dli
Spark作业相关
更新时间:2021/02/07 GMT+08:00
提交批处理作业
DLI提供执行批处理作业的接口。您可以使用该接口执行批处理作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | private static void runBatchJob(Cluster cluster) throws DLIException {
SparkJobInfo jobInfo = new SparkJobInfo();
jobInfo.setClassName("your.class.name");
jobInfo.setFile("xxx.jar");
jobInfo.setCluster_name("queueName");
// 调用BatchJob对象的asyncSubmit接口提交批处理作业
BatchJob job = new BatchJob(cluster, jobInfo);
job.asyncSubmit();
while (true) {
SparkJobStatus jobStatus = job.getStatus();
if (SparkJobStatus.SUCCESS.equals(jobStatus)) {
System.out.println("Job finished");
return;
}
if (SparkJobStatus.DEAD.equals(jobStatus)) {
throw new DLIException("The batch has already exited");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|

- Cluster为用户自建的队列。
- 传参不能为JSON格式。
- 对应批处理作业提交提供两个接口:
- 异步 asyncSubmit,提交后直接返回,不等待
- 同步 submit,提交后会一直等待作业执行结束
删除批处理作业
DLI提供删除批处理作业的接口。您可以使用该接口删除批处理作业。示例代码如下:
1 2 3 4 5 6 7 | private static void deleteBatchJob(DLIClient client) throws DLIException {
//提交Spark批处理运行作业的Id
String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6";
// 调用BatchJob对象的delBatch接口取消批处理作业
MessageInfo messageInfo = client.delBatchJob(batchId);
System.out.println(messageInfo.getMsg());
}
|
查询所有批处理作业
DLI提供查询批处理作业的接口。您可以使用该接口查询当前工程下的所有批处理作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | private static void listAllBatchJobs(DLIClient client) throws DLIException {
System.out.println("list all batch jobs...");
// 通过调用DLIClient对象的listAllBatchJobs方法查询批处理作业
String queueName = "queueName";
int from = 0;
int size = 1000;
// 分页,起始页,每页大小
List<SparkJobResultInfo> jobResults = client.listAllBatchJobs(queueName, from, size);
for (SparkJobResultInfo jobResult : jobResults) {
// job id
System.out.println(jobResult.getId());
// job app id
System.out.println(jobResult.getAppId());
// job状态
System.out.println(jobResult.getState());
}
}
|
父主题: Java SDK
