dli
作业相关
更新时间:2020/09/29 GMT+08:00
导入数据
DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | //实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径
private static void importData(Queue queue, Table DLITable) throws DLIException {
String dataPath = "OBS Path";
queue = client.getQueue("queueName");
CsvFormatInfo formatInfo = new CsvFormatInfo();
formatInfo.setWithColumnHeader(true);
formatInfo.setDelimiter(",");
formatInfo.setQuoteChar("\"");
formatInfo.setEscapeChar("\\");
formatInfo.setDateFormat("yyyy/MM/dd");
formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss");
String dbName = DLITable.getDb().getDatabaseName();
String tableName = DLITable.getTableName();
ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath);
importJob.setStorageType(StorageType.CSV);
importJob.setCsvFormatInfo(formatInfo);
System.out.println("start submit import table: " + DLITable.getTableName());
//调用ImportJob对象的submit接口提交导入作业
importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态
JobStatus status = importJob.getStatus();
System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName());
}
|

- 在提交导入作业前,可选择设置导入数据的格式,如样例所示,调用ImportJob对象的setStorageType接口设置数据存储类型为csv,数据的具体格式通过调用ImportJob对象的setCsvFormatInfo接口进行设置。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
导出数据
DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | //实例化ExportJob对象,传入导出数据所需的队列、数据库名、表名(通过实例化Table对象获取)和导出数据的存储路径,仅支持Table类型为MANAGED
private static void exportData(Queue queue, Table DLITable) throws DLIException {
String dataPath = "OBS Path";
queue = client.getQueue("queueName");
String dbName = DLITable.getDb().getDatabaseName();
String tableName = DLITable.getTableName();
ExportJob exportJob = new ExportJob(queue, dbName, tableName, dataPath);
exportJob.setStorageType(StorageType.CSV);
exportJob.setCompressType(CompressType.GZIP);
exportJob.setExportMode(ExportMode.ERRORIFEXISTS);
System.out.println("start export DLI Table data...");
//调用ExportJob对象的submit接口提交导出作业
exportJob.submit();
//调用ExportJob对象的getStatus接口查询导出作业状态
JobStatus status = exportJob.getStatus();
System.out.println("Job id: " + exportJob.getJobId() + ", Status : " + status.getName());
}
|

- 在提交导出作业前,可选设置数据格式,压缩类型,导出模式等,如样例所示,分别调用ExportJob对象的setStorageType、setCompressType、setExportMode接口设置,其中setStorageType仅支持csv格式。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
提交作业
DLI提供查询作业的接口。您可以使用该接口执行查询并获取查询结果。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | //实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句
private static void runSqlJob(Queue queue, Table obsTable) throws DLIException {
String sql = "select * from " + obsTable.getTableName();
String queryResultPath = "OBS Path";
SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
System.out.println("start submit SQL job...");
//调用SQLJob对象的submit接口提交查询作业
sqlJob.submit();
//调用SQLJob对象的getStatus接口查询作业状态
JobStatus status = sqlJob.getStatus();
System.out.println(status);
System.out.println("start export Result...");
//调用SQLJob对象的exportResult接口导出查询结果,其中queryResultPath为导出数据的路径
sqlJob.exportResult(queryResultPath, StorageType.CSV,
CompressType.GZIP, ExportMode.ERRORIFEXISTS, null);
System.out.println("Job id: " + sqlJob.getJobId() + ", Status : " + status.getName());
}
|
取消作业
DLI提供取消作业的接口。您可以使用该接口取消所有Launching或Running状态的Job,以取消Launching状态的Job为例,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 | private static void cancelSqlJob(DLIClient client) throws DLIException {
List<JobResultInfo> jobResultInfos = client.listAllJobs(JobType.QUERY);
for (JobResultInfo jobResultInfo : jobResultInfos) {
//如果Job为“LAUNCHING”状态,则取消
if (JobStatus.LAUNCHING.equals(jobResultInfo.getJobStatus())) {
//通过JobId参数取消Job
client.cancelJob(jobResultInfo.getJobId());
}
}
}
|
查询所有作业
DLI提供查询作业的接口。您可以使用该接口查询当前工程下的所有作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | private static void listAllSqlJobs(DLIClient client) throws DLIException {
//返回JobResultInfo List集合
List < JobResultInfo > jobResultInfos = client.listAllJobs();
//遍历List集合查看Job信息
for (JobResultInfo jobResultInfo: jobResultInfos) {
//job id
System.out.println(jobResultInfo.getJobId());
//job 描述信息
System.out.println(jobResultInfo.getDetail());
//job 状态
System.out.println(jobResultInfo.getJobStatus());
//job 类型
System.out.println(jobResultInfo.getJobType());
}
//通过JobType过滤
List < JobResultInfo > jobResultInfos1 = client.listAllJobs(JobType.DDL);
//通过起始时间和JobType过滤,起始时间的格式为unix时间戳
List < JobResultInfo > jobResultInfos2 = client.listAllJobs(1502349803729L, 1502349821460L, JobType.DDL);
//通过分页过滤
List < JobResultInfo > jobResultInfos3 = client.listAllJobs(100, 1, JobType.DDL);
//分页,起始时间,Job类型
List < JobResultInfo > jobResultInfos4 = client.listAllJobs(100, 1, 1502349803729L, 1502349821460L, JobType.DDL);
}
|

重载方法的参数,可以设置为“null”,表示不设置过滤条件。同时也要注意参数的合法性,例如分页参数设置为“-1”,会导致查询失败。
查询作业结果
DLI提供查询作业结果的接口。您可以使用该接口通过JobId查询该作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 | private static void getJobResultInfo(DLIClient client) throws DLIException {
String jobId = "4c4f7168-5bc4-45bd-8c8a-43dfc85055d0";
JobResultInfo jobResultInfo = client.queryJobResultInfo(jobId);
//查询job信息
System.out.println(jobResultInfo.getJobId());
System.out.println(jobResultInfo.getDetail());
System.out.println(jobResultInfo.getJobStatus());
System.out.println(jobResultInfo.getJobType());
}
|
查询SQL类型作业
DLI提供查询SQL类型作业的接口。您可以使用该接口查询当前工程下,在编辑框中提交的最近执行的作业的信息(即可用SQL语句提交的Job)。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private static void getJobResultInfos(DLIClient client) throws DLIException {
//返回JobResultInfo List集合
List<JobResultInfo> jobResultInfos = client.listSQLJobs();
//遍历集合查询job信息
for (JobResultInfo jobResultInfo : jobResultInfos) {
//job id
System.out.println(jobResultInfo.getJobId());
//job 描述信息
System.out.println(jobResultInfo.getDetail());
//job 状态
System.out.println(jobResultInfo.getJobStatus());
//job 类型
System.out.println(jobResultInfo.getJobType());
}
}
|
导出查询结果
DLI提供导出查询结果的接口。您可以使用该接口导出当前工程下,在编辑框中提交的查询作业的结果。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | //实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句
private static void exportSqlResult(Queue queue, Table obsTable) throws DLIException {
String sql = "select * from " + obsTable.getTableName();
String queryResultPath = "OBS Path";
SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
System.out.println("start submit SQL job...");
//调用SQLJob对象的submit接口提交查询作业
sqlJob.submit();
//调用SQLJob对象的getStatus接口查询作业状态
JobStatus status = sqlJob.getStatus();
System.out.println(status);
System.out.println("start export Result...");
//调用SQLJob对象的exportResult接口导出查询结果,其中exportPath为导出数据的路径,JSON为导出格式,queueName为执行导出作业的队列,limitNum为导出作业结果条数,0表示全部导出
sqlJob.exportResult(exportPath + "result", StorageType.JSON, CompressType.NONE,
ExportMode.ERRORIFEXISTS, queueName, true, 5);
}
|
父主题: SQL作业相关
