更新时间:2024-03-06 GMT+08:00
作业相关
导入数据
DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径 private static void importData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); CsvFormatInfo formatInfo = new CsvFormatInfo(); formatInfo.setWithColumnHeader(true); formatInfo.setDelimiter(","); formatInfo.setQuoteChar("\""); formatInfo.setEscapeChar("\\"); formatInfo.setDateFormat("yyyy/MM/dd"); formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath); importJob.setStorageType(StorageType.CSV); importJob.setCsvFormatInfo(formatInfo); System.out.println("start submit import table: " + DLITable.getTableName()); //调用ImportJob对象的submit接口提交导入作业 importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态 JobStatus status = importJob.getStatus(); System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName()); } |
- 在提交导入作业前,可选择设置导入数据的格式,如样例所示,调用ImportJob对象的setStorageType接口设置数据存储类型为csv,数据的具体格式通过调用ImportJob对象的setCsvFormatInfo接口进行设置。
- 在提交导入作业前,可选择设置导入数据的分区并配置是否是overwrite写入,分区信息可以调用ImportJob对象的setPartitionSpec接口设置,如:importJob.setPartitionSpec(new PartitionSpec("part1=value1,part2=value2")),也可以在创建ImportJob对象的时候直接通过参数的形式创建 。导入作业默认是追加写,如果需要覆盖写,则可以调用ImportJob对象的setOverWrite接口设置,如:importJob.setOverWrite(Boolean.TRUE)。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
导入分区数据
DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表指定分区中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径 private static void importData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); CsvFormatInfo formatInfo = new CsvFormatInfo(); formatInfo.setWithColumnHeader(true); formatInfo.setDelimiter(","); formatInfo.setQuoteChar("\""); formatInfo.setEscapeChar("\\"); formatInfo.setDateFormat("yyyy/MM/dd"); formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); PartitionSpec partitionSpec = new PartitionSpec("part1=value1,part2=value2"); Boolean isOverWrite = true; ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath, partitionSpec, isOverWrite); importJob.setStorageType(StorageType.CSV); importJob.setCsvFormatInfo(formatInfo); System.out.println("start submit import table: " + DLITable.getTableName()); //调用ImportJob对象的submit接口提交导入作业 importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态 JobStatus status = importJob.getStatus(); System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName()); } |
- 在创建ImportJob对象的时候分区信息PartitionSpec也可以直接传入分区字符串。
- partitionSpec如果导入时指定部分列为分区列,而导入的数据只包含了指定的分区信息,则数据导入后的未指定的分区列字段会存在null值等异常值。
- 示例中isOverWrite表示是否是覆盖写,为true表示覆盖写,为false表示追加写。目前不支持overwrite覆盖写整表,只支持overwrite写指定分区。如果需要追加写指定分区,则在创建ImportJob的时候指定isOverWrite为false。
导出数据
DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//实例化ExportJob对象,传入导出数据所需的队列、数据库名、表名(通过实例化Table对象获取)和导出数据的存储路径,仅支持Table类型为MANAGED private static void exportData(Queue queue, Table DLITable) throws DLIException { String dataPath = "OBS Path"; queue = client.getQueue("queueName"); String dbName = DLITable.getDb().getDatabaseName(); String tableName = DLITable.getTableName(); ExportJob exportJob = new ExportJob(queue, dbName, tableName, dataPath); exportJob.setStorageType(StorageType.CSV); exportJob.setCompressType(CompressType.GZIP); exportJob.setExportMode(ExportMode.ERRORIFEXISTS); System.out.println("start export DLI Table data..."); //调用ExportJob对象的submit接口提交导出作业 exportJob.submit(); //调用ExportJob对象的getStatus接口查询导出作业状态 JobStatus status = exportJob.getStatus(); System.out.println("Job id: " + exportJob.getJobId() + ", Status : " + status.getName()); } |
- 在提交导出作业前,可选设置数据格式,压缩类型,导出模式等,如样例所示,分别调用ExportJob对象的setStorageType、setCompressType、setExportMode接口设置,其中setStorageType仅支持csv格式。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
提交作业
DLI提供提交作业和查询作业的接口。您可以通过提交接口提交作业,如果需要查询结果可以调用查询接口查询该作业的结果。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句 private static void runSqlJob(Queue queue, Table obsTable) throws DLIException { String sql = "select * from " + obsTable.getTableName(); String queryResultPath = "OBS Path"; SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql); System.out.println("start submit SQL job..."); //调用SQLJob对象的submit接口提交查询作业 sqlJob.submit(); //调用SQLJob对象的getStatus接口查询作业状态 JobStatus status = sqlJob.getStatus(); System.out.println(status); System.out.println("start export Result..."); //调用SQLJob对象的exportResult接口导出查询结果,其中queryResultPath为导出数据的路径 sqlJob.exportResult(queryResultPath, StorageType.CSV, CompressType.GZIP, ExportMode.ERRORIFEXISTS, null); System.out.println("Job id: " + sqlJob.getJobId() + ", Status : " + status.getName()); } |
取消作业
DLI提供取消作业的接口。您可以使用该接口取消所有Launching或Running状态的Job,以取消Launching状态的Job为例,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 |
private static void cancelSqlJob(DLIClient client) throws DLIException { List<JobResultInfo> jobResultInfos = client.listAllJobs(JobType.QUERY); for (JobResultInfo jobResultInfo : jobResultInfos) { //如果Job为“LAUNCHING”状态,则取消 if (JobStatus.LAUNCHING.equals(jobResultInfo.getJobStatus())) { //通过JobId参数取消Job client.cancelJob(jobResultInfo.getJobId()); } } } |
查询所有作业
DLI提供查询作业的接口。您可以使用该接口查询当前工程下的所有作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
private static void listAllSqlJobs(DLIClient client) throws DLIException { //返回JobResultInfo List集合 List < JobResultInfo > jobResultInfos = client.listAllJobs(); //遍历List集合查看Job信息 for (JobResultInfo jobResultInfo: jobResultInfos) { //job id System.out.println(jobResultInfo.getJobId()); //job 描述信息 System.out.println(jobResultInfo.getDetail()); //job 状态 System.out.println(jobResultInfo.getJobStatus()); //job 类型 System.out.println(jobResultInfo.getJobType()); } //通过JobType过滤 List < JobResultInfo > jobResultInfos1 = client.listAllJobs(JobType.DDL); //通过起始时间和JobType过滤,起始时间的格式为unix时间戳 List < JobResultInfo > jobResultInfos2 = client.listAllJobs(1502349803729L, 1502349821460L, JobType.DDL); //通过分页过滤 List < JobResultInfo > jobResultInfos3 = client.listAllJobs(100, 1, JobType.DDL); //分页,起始时间,Job类型 List < JobResultInfo > jobResultInfos4 = client.listAllJobs(100, 1, 1502349803729L, 1502349821460L, JobType.DDL); //通过Tags过滤查询满足条件的所有作业列表 JobFilter jobFilter = new JobFilter(); jobFilter.setTags("workspace=space002,jobName=name002"); List < JobResultInfo > jobResultInfos1 = client.listAllJobs(jobFilter); //通过Tags过滤查询满足条件的指定page的作业列表 JobFilter jobFilter = new JobFilter(); jobFilter.setTags("workspace=space002,jobName=name002"); jobFilter.setPageSize(100); jobFilter.setCurrentPage(0); List < JobResultInfo > jobResultInfos1 = client.listJobsByPage(jobFilter); } |
- 重载方法的参数,可以设置为“null”,表示不设置过滤条件。同时也要注意参数的合法性,例如分页参数设置为“-1”,会导致查询失败。
- 该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。
如果需要则可以通过查询所有作业API接口指定该参数进行查询。
查询作业结果
DLI提供查询作业结果的接口。您可以使用该接口通过JobId查询该作业信息。示例代码如下:
1 2 3 4 5 6 7 8 9 |
private static void getJobResultInfo(DLIClient client) throws DLIException { String jobId = "4c4f7168-5bc4-45bd-8c8a-43dfc85055d0"; JobResultInfo jobResultInfo = client.queryJobResultInfo(jobId); //查询job信息 System.out.println(jobResultInfo.getJobId()); System.out.println(jobResultInfo.getDetail()); System.out.println(jobResultInfo.getJobStatus()); System.out.println(jobResultInfo.getJobType()); } |
查询SQL类型作业
DLI提供查询SQL类型作业的接口。您可以使用该接口查询当前工程下,在编辑框中提交的最近执行的作业的信息(即可用SQL语句提交的Job)。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
private static void getJobResultInfos(DLIClient client) throws DLIException { //返回JobResultInfo List集合 List<JobResultInfo> jobResultInfos = client.listSQLJobs(); //遍历集合查询job信息 for (JobResultInfo jobResultInfo : jobResultInfos) { //job id System.out.println(jobResultInfo.getJobId()); //job 描述信息 System.out.println(jobResultInfo.getDetail()); //job 状态 System.out.println(jobResultInfo.getJobStatus()); //job 类型 System.out.println(jobResultInfo.getJobType()); } //通过Tags过滤查询满足条件的所有SQL作业列表 JobFilter jobFilter = new JobFilter(); jobFilter.setTags("workspace=space002,jobName=name002"); List < JobResultInfo > jobResultInfos1 = client.listAllSQLJobs(jobFilter); //通过Tags过滤查询满足条件的指定page的SQL作业列表 JobFilter jobFilter = new JobFilter(); jobFilter.setTags("workspace=space002,jobName=name002"); jobFilter.setPageSize(100); jobFilter.setCurrentPage(0); List < JobResultInfo > jobResultInfos1 = client.listSQLJobsByPage(jobFilter); } |
导出查询结果
DLI提供导出查询结果的接口。您可以使用该接口导出当前工程下,在编辑框中提交的查询作业的结果。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句 private static void exportSqlResult(Queue queue, Table obsTable) throws DLIException { String sql = "select * from " + obsTable.getTableName(); String queryResultPath = "OBS Path"; SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql); System.out.println("start submit SQL job..."); //调用SQLJob对象的submit接口提交查询作业 sqlJob.submit(); //调用SQLJob对象的getStatus接口查询作业状态 JobStatus status = sqlJob.getStatus(); System.out.println(status); System.out.println("start export Result..."); //调用SQLJob对象的exportResult接口导出查询结果,其中exportPath为导出数据的路径,JSON为导出格式,queueName为执行导出作业的队列,limitNum为导出作业结果条数,0表示全部导出 sqlJob.exportResult(exportPath + "result", StorageType.JSON, CompressType.NONE, ExportMode.ERRORIFEXISTS, queueName, true, 5); } |
预览作业结果
DLI提供预览作业结果的接口。您可以使用该接口获取结果集的前1000条记录。
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名和SQL语句 private static void getPreviewJobResult(Queue queue, Table obsTable) throws DLIException { String sql = "select * from " + obsTable.getTableName(); SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql); System.out.println("start submit SQL job..."); //调用SQLJob对象的submit接口提交查询作业 sqlJob.submit(); //调用SQLJob对象的previewJobResult接口查询结果集的前1000条记录 List<Row> rows = sqlJob.previewJobResult(); if (rows.size() > 0) { Integer value = rows.get(0).getInt(0); System.out.println("获取第一行结果中的第一列数据值" + value); } System.out.println("Job id: " + sqlJob.getJobId() + ", previewJobResultSize : " + rows.size()); }
废弃的接口
getJobResult接口当前已废弃,如果需要getJobResult类似功能可以通过调用DownloadJob接口获取。
DownloadJob接口详情可以在“dli-sdk-java-x.x.x.zip”压缩包中获取。“dli-sdk-java-x.x.x.zip”压缩包可以参考SDK的获取与安装中的操作步骤获取。
父主题: SQL作业相关