更新时间:2024-03-06 GMT+08:00

作业相关

导入数据

DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表中。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径
private static void importData(Queue queue, Table DLITable) throws DLIException {
    String dataPath = "OBS Path";
    queue = client.getQueue("queueName");
    CsvFormatInfo formatInfo = new CsvFormatInfo();
    formatInfo.setWithColumnHeader(true);
    formatInfo.setDelimiter(",");
    formatInfo.setQuoteChar("\"");
    formatInfo.setEscapeChar("\\");
    formatInfo.setDateFormat("yyyy/MM/dd");
    formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss");
    String dbName = DLITable.getDb().getDatabaseName();
    String tableName = DLITable.getTableName();
    ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath);
    importJob.setStorageType(StorageType.CSV);
    importJob.setCsvFormatInfo(formatInfo);
    System.out.println("start submit import table: " + DLITable.getTableName());
    //调用ImportJob对象的submit接口提交导入作业
    importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态
    JobStatus status = importJob.getStatus();
    System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName());
}
  • 在提交导入作业前,可选择设置导入数据的格式,如样例所示,调用ImportJob对象的setStorageType接口设置数据存储类型为csv,数据的具体格式通过调用ImportJob对象的setCsvFormatInfo接口进行设置。
  • 在提交导入作业前,可选择设置导入数据的分区并配置是否是overwrite写入,分区信息可以调用ImportJob对象的setPartitionSpec接口设置,如:importJob.setPartitionSpec(new PartitionSpec("part1=value1,part2=value2")),也可以在创建ImportJob对象的时候直接通过参数的形式创建 。导入作业默认是追加写,如果需要覆盖写,则可以调用ImportJob对象的setOverWrite接口设置,如:importJob.setOverWrite(Boolean.TRUE)。
  • 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。

导入分区数据

DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表或者OBS表指定分区中。示例代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//实例化importJob对象,构造函数的入参包括队列、数据库名、表名(通过实例化Table对象获取)和数据路径
private static void importData(Queue queue, Table DLITable) throws DLIException {
    String dataPath = "OBS Path";
    queue = client.getQueue("queueName");
    CsvFormatInfo formatInfo = new CsvFormatInfo();
    formatInfo.setWithColumnHeader(true);
    formatInfo.setDelimiter(",");
    formatInfo.setQuoteChar("\"");
    formatInfo.setEscapeChar("\\");
    formatInfo.setDateFormat("yyyy/MM/dd");
    formatInfo.setTimestampFormat("yyyy-MM-dd HH:mm:ss");
    String dbName = DLITable.getDb().getDatabaseName();
    String tableName = DLITable.getTableName();
    PartitionSpec partitionSpec = new PartitionSpec("part1=value1,part2=value2");
    Boolean isOverWrite = true;
    ImportJob importJob = new ImportJob(queue, dbName, tableName, dataPath, partitionSpec, isOverWrite);
    importJob.setStorageType(StorageType.CSV);
    importJob.setCsvFormatInfo(formatInfo);
    System.out.println("start submit import table: " + DLITable.getTableName());
    //调用ImportJob对象的submit接口提交导入作业
    importJob.submit(); //调用ImportJob对象的getStatus接口查询导入作业状态
    JobStatus status = importJob.getStatus();
    System.out.println("Job id: " + importJob.getJobId() + ", Status : " + status.getName());
}
  • 在创建ImportJob对象的时候分区信息PartitionSpec也可以直接传入分区字符串。
  • partitionSpec如果导入时指定部分列为分区列,而导入的数据只包含了指定的分区信息,则数据导入后的未指定的分区列字段会存在null值等异常值。
  • 示例中isOverWrite表示是否是覆盖写,为true表示覆盖写,为false表示追加写。目前不支持overwrite覆盖写整表,只支持overwrite写指定分区。如果需要追加写指定分区,则在创建ImportJob的时候指定isOverWrite为false。

导出数据

DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//实例化ExportJob对象,传入导出数据所需的队列、数据库名、表名(通过实例化Table对象获取)和导出数据的存储路径,仅支持Table类型为MANAGED
private static void exportData(Queue queue, Table DLITable) throws DLIException {
    String dataPath = "OBS Path";
    queue = client.getQueue("queueName");
    String dbName = DLITable.getDb().getDatabaseName();
    String tableName = DLITable.getTableName();
    ExportJob exportJob = new ExportJob(queue, dbName, tableName, dataPath);
    exportJob.setStorageType(StorageType.CSV);
    exportJob.setCompressType(CompressType.GZIP);
    exportJob.setExportMode(ExportMode.ERRORIFEXISTS);
    System.out.println("start export DLI Table data...");
    //调用ExportJob对象的submit接口提交导出作业
    exportJob.submit();
    //调用ExportJob对象的getStatus接口查询导出作业状态
    JobStatus status = exportJob.getStatus();
    System.out.println("Job id: " + exportJob.getJobId() + ", Status : " + status.getName());
}
  • 在提交导出作业前,可选设置数据格式,压缩类型,导出模式等,如样例所示,分别调用ExportJob对象的setStorageType、setCompressType、setExportMode接口设置,其中setStorageType仅支持csv格式。
  • 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。

提交作业

DLI提供提交作业和查询作业的接口。您可以通过提交接口提交作业,如果需要查询结果可以调用查询接口查询该作业的结果。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句  
private static void runSqlJob(Queue queue, Table obsTable) throws DLIException {
    String sql = "select * from " + obsTable.getTableName();
    String queryResultPath = "OBS Path";
    SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
    System.out.println("start submit SQL job...");
    //调用SQLJob对象的submit接口提交查询作业   
    sqlJob.submit();
    //调用SQLJob对象的getStatus接口查询作业状态    
    JobStatus status = sqlJob.getStatus();
    System.out.println(status);
    System.out.println("start export Result...");
    //调用SQLJob对象的exportResult接口导出查询结果,其中queryResultPath为导出数据的路径   
    sqlJob.exportResult(queryResultPath, StorageType.CSV,
            CompressType.GZIP, ExportMode.ERRORIFEXISTS, null);
    System.out.println("Job id: " + sqlJob.getJobId() + ", Status : " + status.getName());
}

取消作业

DLI提供取消作业的接口。您可以使用该接口取消所有Launching或Running状态的Job,以取消Launching状态的Job为例,示例代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private static void cancelSqlJob(DLIClient client) throws DLIException {

  List<JobResultInfo> jobResultInfos = client.listAllJobs(JobType.QUERY);
  for (JobResultInfo jobResultInfo : jobResultInfos) {
      //如果Job为“LAUNCHING”状态,则取消
      if (JobStatus.LAUNCHING.equals(jobResultInfo.getJobStatus())) {
          //通过JobId参数取消Job
          client.cancelJob(jobResultInfo.getJobId());
      }
  }
}

查询所有作业

DLI提供查询作业的接口。您可以使用该接口查询当前工程下的所有作业信息。示例代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static void listAllSqlJobs(DLIClient client) throws DLIException {
    //返回JobResultInfo List集合
    List < JobResultInfo > jobResultInfos = client.listAllJobs();
    //遍历List集合查看Job信息
    for (JobResultInfo jobResultInfo: jobResultInfos) {
        //job id
        System.out.println(jobResultInfo.getJobId());
        //job 描述信息
        System.out.println(jobResultInfo.getDetail());
        //job 状态
        System.out.println(jobResultInfo.getJobStatus());
        //job 类型
        System.out.println(jobResultInfo.getJobType());
    }
    //通过JobType过滤
    List < JobResultInfo > jobResultInfos1 = client.listAllJobs(JobType.DDL);
    //通过起始时间和JobType过滤,起始时间的格式为unix时间戳
    List < JobResultInfo > jobResultInfos2 = client.listAllJobs(1502349803729L, 1502349821460L, JobType.DDL);
    //通过分页过滤
    List < JobResultInfo > jobResultInfos3 = client.listAllJobs(100, 1, JobType.DDL);
    //分页,起始时间,Job类型
    List < JobResultInfo > jobResultInfos4 = client.listAllJobs(100, 1, 1502349803729L, 1502349821460L, JobType.DDL);

    //通过Tags过滤查询满足条件的所有作业列表
    JobFilter jobFilter = new JobFilter();
    jobFilter.setTags("workspace=space002,jobName=name002");
    List < JobResultInfo > jobResultInfos1 = client.listAllJobs(jobFilter);
    //通过Tags过滤查询满足条件的指定page的作业列表
    JobFilter jobFilter = new JobFilter();
    jobFilter.setTags("workspace=space002,jobName=name002");
    jobFilter.setPageSize(100);
    jobFilter.setCurrentPage(0);
    List < JobResultInfo > jobResultInfos1 = client.listJobsByPage(jobFilter);
}
  • 重载方法的参数,可以设置为“null”,表示不设置过滤条件。同时也要注意参数的合法性,例如分页参数设置为“-1”,会导致查询失败。
  • 该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。

    如果需要则可以通过查询所有作业API接口指定该参数进行查询。

查询作业结果

DLI提供查询作业结果的接口。您可以使用该接口通过JobId查询该作业信息。示例代码如下:
1
2
3
4
5
6
7
8
9
private static void getJobResultInfo(DLIClient client) throws DLIException {
    String jobId = "4c4f7168-5bc4-45bd-8c8a-43dfc85055d0";
    JobResultInfo jobResultInfo = client.queryJobResultInfo(jobId);
    //查询job信息
    System.out.println(jobResultInfo.getJobId());
    System.out.println(jobResultInfo.getDetail());
    System.out.println(jobResultInfo.getJobStatus());
    System.out.println(jobResultInfo.getJobType());
}

查询SQL类型作业

DLI提供查询SQL类型作业的接口。您可以使用该接口查询当前工程下,在编辑框中提交的最近执行的作业的信息(即可用SQL语句提交的Job)。示例代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void getJobResultInfos(DLIClient client) throws DLIException {

 //返回JobResultInfo List集合
  List<JobResultInfo> jobResultInfos = client.listSQLJobs();
  //遍历集合查询job信息
  for (JobResultInfo jobResultInfo : jobResultInfos) {
        //job id
        System.out.println(jobResultInfo.getJobId());
        //job 描述信息
        System.out.println(jobResultInfo.getDetail());
        //job 状态
        System.out.println(jobResultInfo.getJobStatus());
        //job 类型
        System.out.println(jobResultInfo.getJobType());
   }

    //通过Tags过滤查询满足条件的所有SQL作业列表
    JobFilter jobFilter = new JobFilter();
    jobFilter.setTags("workspace=space002,jobName=name002");
    List < JobResultInfo > jobResultInfos1 = client.listAllSQLJobs(jobFilter);
    //通过Tags过滤查询满足条件的指定page的SQL作业列表
    JobFilter jobFilter = new JobFilter();
    jobFilter.setTags("workspace=space002,jobName=name002");
    jobFilter.setPageSize(100);
    jobFilter.setCurrentPage(0);
    List < JobResultInfo > jobResultInfos1 = client.listSQLJobsByPage(jobFilter);
}

导出查询结果

DLI提供导出查询结果的接口。您可以使用该接口导出当前工程下,在编辑框中提交的查询作业的结果。示例代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 //实例化SQLJob对象,传入执行SQL所需的queue,数据库名,SQL语句
  private static void exportSqlResult(Queue queue, Table obsTable) throws DLIException {
    String sql = "select * from " + obsTable.getTableName();
    String queryResultPath = "OBS Path";
    SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
    System.out.println("start submit SQL job...");
    //调用SQLJob对象的submit接口提交查询作业
     sqlJob.submit();
     //调用SQLJob对象的getStatus接口查询作业状态
     JobStatus status = sqlJob.getStatus();
     System.out.println(status);
     System.out.println("start export Result...");
     //调用SQLJob对象的exportResult接口导出查询结果,其中exportPath为导出数据的路径,JSON为导出格式,queueName为执行导出作业的队列,limitNum为导出作业结果条数,0表示全部导出
    sqlJob.exportResult(exportPath + "result", StorageType.JSON, CompressType.NONE,
        ExportMode.ERRORIFEXISTS, queueName, true, 5);
  }

预览作业结果

DLI提供预览作业结果的接口。您可以使用该接口获取结果集的前1000条记录。
//实例化SQLJob对象,传入执行SQL所需的queue,数据库名和SQL语句
private static void getPreviewJobResult(Queue queue, Table obsTable) throws DLIException {
    String sql = "select * from " + obsTable.getTableName();
    SQLJob sqlJob = new SQLJob(queue, obsTable.getDb().getDatabaseName(), sql);
    System.out.println("start submit SQL job...");
    //调用SQLJob对象的submit接口提交查询作业
    sqlJob.submit();
    //调用SQLJob对象的previewJobResult接口查询结果集的前1000条记录
    List<Row> rows = sqlJob.previewJobResult();
    if (rows.size() > 0) {
        Integer value = rows.get(0).getInt(0);
        System.out.println("获取第一行结果中的第一列数据值" + value);
    }
    System.out.println("Job id: " + sqlJob.getJobId() + ", previewJobResultSize : " + rows.size());
}

废弃的接口

getJobResult接口当前已废弃,如果需要getJobResult类似功能可以通过调用DownloadJob接口获取。

DownloadJob接口详情可以在“dli-sdk-java-x.x.x.zip”压缩包中获取。“dli-sdk-java-x.x.x.zip”压缩包可以参考SDK的获取与安装中的操作步骤获取。