导入数据
DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表中。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径
private static void importData(Queue queue, Table DLITable) throws DLIException {
String dataPath = "OBS Path";
queue = client.getQueue("queueName");
CsvFormatInfo formatInfo = new CsvFormatInfo();
formatInfo.setWithColumnHeader(true);
formatInfo.setDelimiter(",");
formatInfo.setQuoteChar("\"");
formatInfo.setEscapeChar("\\");
formatInfo.setDateFormat("yyyy/MM/dd");
formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss");
String dbName = DLITable.getDb().getDatabaseName();
String tableName = DLITable.getTableName();
ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath);
importJob.setStorageType(StorageType.CSV);
importJob.setCsvFormatInfo(formatInfo);
System.out.println("start submit import table: " + DLITable.getTableName());
//调用ImportJob对象的submit接口提交导入作业
importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态
JobStatus status = importJob.getStatus();
System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName());
}
|
- 在提交导入作业前,可选择设置导入数据的格式,如样例所示,调用ImportJob对象的setStorageType接口设置数据存储类型为csv,数据的具体格式通过调用ImportJob对象的setCsvFormatInfo接口进行设置。
- 在提交导入作业前,可选择设置导入数据的分区并配置是否是overwrite写入,分区信息可以调用ImportJob对象的setPartitionSpec接口设置,如:importJob.setPartitionSpec(new PartitionSpec("part1=value1,part2=value2")),也可以在创建ImportJob对象的时候直接通过参数的形式创建 。导入作业默认是追加写,如果需要覆盖写,则可以调用ImportJob对象的setOverWrite接口设置,如:importJob.setOverWrite(Boolean.TRUE)。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
导入分区数据
DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表指定分区中。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径
private static void importData(Queue queue, Table DLITable) throws DLIException {
String dataPath = "OBS Path";
queue = client.getQueue("queueName");
CsvFormatInfo formatInfo = new CsvFormatInfo();
formatInfo.setWithColumnHeader(true);
formatInfo.setDelimiter(",");
formatInfo.setQuoteChar("\"");
formatInfo.setEscapeChar("\\");
formatInfo.setDateFormat("yyyy/MM/dd");
formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss");
String dbName = DLITable.getDb().getDatabaseName();
String tableName = DLITable.getTableName();
PartitionSpec partitionSpec = new PartitionSpec("part1=value1,part2=value2");
Boolean isOverWrite = true;
ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath, partitionSpec, isOverWrite);
importJob.setStorageType(StorageType.CSV);
importJob.setCsvFormatInfo(formatInfo);
System.out.println("start submit import table: " + DLITable.getTableName());
//调用ImportJob对象的submit接口提交导入作业
importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态
JobStatus status = importJob.getStatus();
System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName());
}
|
- 在创建ImportJob对象的时候分区信息PartitionSpec也可以直接传入分区字符串。
- partitionSpec如果导入时指定部分列为分区列,而导入的数据只包含了指定的分区信息,则数据导入后的未指定的分区列字段会存在null值等异常值。
- 示例中isOverWrite表示是否是覆盖写,为true表示覆盖写,为false表示追加写。目前不支持overwrite覆盖写整表,只支持overwrite写指定分区。如果需要追加写指定分区,则在创建ImportJob的时候指定isOverWrite为false。
导出数据
DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//实例化ExportJob对象,传入导出数据所需的队列、数据库名、表名(通过实例化Table对象获取)和导出数据的存储路径,仅支持Table类型为MANAGED
private static void exportData(Queue queue, Table DLITable) throws DLIException {
String dataPath = "OBS Path";
queue = client.getQueue("queueName");
String dbName = DLITable.getDb().getDatabaseName();
String tableName = DLITable.getTableName();
ExportJob exportJob = new ExportJob(queue, dbName, tableName, dataPath);
exportJob.setStorageType(StorageType.CSV);
exportJob.setCompressType(CompressType.GZIP);
exportJob.setExportMode(ExportMode.ERRORIFEXISTS);
System.out.println("start export DLI Table data...");
//调用ExportJob对象的submit接口提交导出作业
exportJob.submit();
//调用ExportJob对象的getStatus接口查询导出作业状态
JobStatus status = exportJob.getStatus();
System.out.println("Job id: " + exportJob.getJobId() + ", Status : " + status.getName());
}
|
- 在提交导出作业前,可选设置数据格式,压缩类型,导出模式等,如样例所示,分别调用ExportJob对象的setStorageType、setCompressType、setExportMode接口设置,其中setStorageType仅支持csv格式。
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
提交作业
DLI提供提交作业和查询作业的接口。您可以通过提交接口提交作业,如果需要查询结果可以调用查询接口查询该作业的结果。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句
private static void runSqlJob(Queue queue, Table obsTable) throws DLIException {
String sql = "select * from " + obsTable.getTableName();
String queryResultPath = "OBS Path";
SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
System.out.println("start submit SQL job...");
//调用SQLJob对象的submit接口提交查询作业
sqlJob.submit();
//调用SQLJob对象的getStatus接口查询作业状态
JobStatus status = sqlJob.getStatus();
System.out.println(status);
System.out.println("start export Result...");
//调用SQLJob对象的exportResult接口导出查询结果,其中queryResultPath为导出数据的路径
sqlJob.exportResult(queryResultPath, StorageType.CSV,
CompressType.GZIP, ExportMode.ERRORIFEXISTS, null);
System.out.println("Job id: " + sqlJob.getJobId() + ", Status : " + status.getName());
}
|
取消作业
DLI提供取消作业的接口。您可以使用该接口取消所有Launching或Running状态的Job,以取消Launching状态的Job为例,示例代码如下:
|
private static void cancelSqlJob(DLIClient client) throws DLIException {
List<JobResultInfo> jobResultInfos = client.listAllJobs(JobType.QUERY);
for (JobResultInfo jobResultInfo : jobResultInfos) {
//如果Job为“LAUNCHING”状态,则取消
if (JobStatus.LAUNCHING.equals(jobResultInfo.getJobStatus())) {
//通过JobId参数取消Job
client.cancelJob(jobResultInfo.getJobId());
}
}
}
|
查询所有作业
DLI提供查询作业的接口。您可以使用该接口查询当前工程下的所有作业信息。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
private static void listAllSqlJobs(DLIClient client) throws DLIException {
//返回JobResultInfo List集合
List < JobResultInfo > jobResultInfos = client.listAllJobs();
//遍历List集合查看Job信息
for (JobResultInfo jobResultInfo: jobResultInfos) {
//job id
System.out.println(jobResultInfo.getJobId());
//job 描述信息
System.out.println(jobResultInfo.getDetail());
//job 状态
System.out.println(jobResultInfo.getJobStatus());
//job 类型
System.out.println(jobResultInfo.getJobType());
}
//通过JobType过滤
List < JobResultInfo > jobResultInfos1 = client.listAllJobs(JobType.DDL);
//通过起始时间和JobType过滤,起始时间的格式为unix时间戳
List < JobResultInfo > jobResultInfos2 = client.listAllJobs(1502349803729L, 1502349821460L, JobType.DDL);
//通过分页过滤
List < JobResultInfo > jobResultInfos3 = client.listAllJobs(100, 1, JobType.DDL);
//分页,起始时间,Job类型
List < JobResultInfo > jobResultInfos4 = client.listAllJobs(100, 1, 1502349803729L, 1502349821460L, JobType.DDL);
//通过Tags过滤查询满足条件的所有作业列表
JobFilter jobFilter = new JobFilter();
jobFilter.setTags("workspace=space002,jobName=name002");
List < JobResultInfo > jobResultInfos1 = client.listAllJobs(jobFilter);
//通过Tags过滤查询满足条件的指定page的作业列表
JobFilter jobFilter = new JobFilter();
jobFilter.setTags("workspace=space002,jobName=name002");
jobFilter.setPageSize(100);
jobFilter.setCurrentPage(0);
List < JobResultInfo > jobResultInfos1 = client.listJobsByPage(jobFilter);
}
|
查询作业结果
DLI提供查询作业结果的接口。您可以使用该接口通过JobId查询该作业信息。示例代码如下:
|
private static void getJobResultInfo(DLIClient client) throws DLIException {
String jobId = "4c4f7168-5bc4-45bd-8c8a-43dfc85055d0";
JobResultInfo jobResultInfo = client.queryJobResultInfo(jobId);
//查询job信息
System.out.println(jobResultInfo.getJobId());
System.out.println(jobResultInfo.getDetail());
System.out.println(jobResultInfo.getJobStatus());
System.out.println(jobResultInfo.getJobType());
}
|
查询SQL类型作业
DLI提供查询SQL类型作业的接口。您可以使用该接口查询当前工程下,在编辑框中提交的最近执行的作业的信息(即可用SQL语句提交的Job)。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private static void getJobResultInfos(DLIClient client) throws DLIException {
//返回JobResultInfo List集合
List<JobResultInfo> jobResultInfos = client.listSQLJobs();
//遍历集合查询job信息
for (JobResultInfo jobResultInfo : jobResultInfos) {
//job id
System.out.println(jobResultInfo.getJobId());
//job 描述信息
System.out.println(jobResultInfo.getDetail());
//job 状态
System.out.println(jobResultInfo.getJobStatus());
//job 类型
System.out.println(jobResultInfo.getJobType());
}
//通过Tags过滤查询满足条件的所有SQL作业列表
JobFilter jobFilter = new JobFilter();
jobFilter.setTags("workspace=space002,jobName=name002");
List < JobResultInfo > jobResultInfos1 = client.listAllSQLJobs(jobFilter);
//通过Tags过滤查询满足条件的指定page的SQL作业列表
JobFilter jobFilter = new JobFilter();
jobFilter.setTags("workspace=space002,jobName=name002");
jobFilter.setPageSize(100);
jobFilter.setCurrentPage(0);
List < JobResultInfo > jobResultInfos1 = client.listSQLJobsByPage(jobFilter);
}
|
导出查询结果
DLI提供导出查询结果的接口。您可以使用该接口导出当前工程下,在编辑框中提交的查询作业的结果。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句
private static void exportSqlResult(Queue queue, Table obsTable) throws DLIException {
String sql = "select * from " + obsTable.getTableName();
String queryResultPath = "OBS Path";
SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
System.out.println("start submit SQL job...");
//调用SQLJob对象的submit接口提交查询作业
sqlJob.submit();
//调用SQLJob对象的getStatus接口查询作业状态
JobStatus status = sqlJob.getStatus();
System.out.println(status);
System.out.println("start export Result...");
//调用SQLJob对象的exportResult接口导出查询结果,其中exportPath为导出数据的路径,JSON为导出格式,queueName为执行导出作业的队列,limitNum为导出作业结果条数,0表示全部导出
sqlJob.exportResult(exportPath + "result", StorageType.JSON, CompressType.NONE,
ExportMode.ERRORIFEXISTS, queueName, true, 5);
}
|
预览作业结果
DLI提供预览作业结果的接口。您可以使用该接口获取结果集的前1000条记录。
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名和SQL语句
private static void getPreviewJobResult(Queue queue, Table obsTable) throws DLIException {
String sql = "select * from " + obsTable.getTableName();
SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
System.out.println("start submit SQL job...");
//调用SQLJob对象的submit接口提交查询作业
sqlJob.submit();
//调用SQLJob对象的previewJobResult接口查询结果集的前1000条记录
List<Row> rows = sqlJob.previewJobResult();
if (rows.size() > 0) {
Integer value = rows.get(0).getInt(0);
System.out.println("获取第一行结果中的第一列数据值" + value);
}
System.out.println("Job id: " + sqlJob.getJobId() + ", previewJobResultSize : " + rows.size());
}
废弃的接口
getJobResult接口当前已废弃,如果需要getJobResult类似功能可以通过调用DownloadJob接口获取。
DownloadJob接口详情可以在“dli-sdk-java-x.x.x.zip”压缩包中获取。“dli-sdk-java-x.x.x.zip”压缩包可以参考SDK的获取与安装中的操作步骤获取。