使用SDK提交SQL作业
本节操作介绍使用DLI SDK V2提交SQL作业的操作方法。

DLI SDK V2当前处于公测阶段,如需使用请提交工单申请开通。
- 2024年5月起,首次使用DLI的用户可以直接使用DLI SDK V2,无需申请。
- 对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请开通试用权限。
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
private static final Logger logger = LoggerFactory.getLogger(SqlJobExample.class); private static final ThreadLocal<DateFormat> DATE_FORMAT = ThreadLocal.withInitial( () -> new SimpleDateFormat("yyyy-MM-dd")); private static final ThreadLocal<DateFormat> TIMESTAMP_FORMAT = ThreadLocal.withInitial( () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZZ")); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DLIInfo dliInfo = new DLIInfo("RegionName", yourAccessKey, yourSecretKey, "YouProjectId"); dliInfo.setQueueName("YourQueueName"); try { // 步骤一:创建数据库、表。 prepare(dliInfo); /* * 步骤二:导入数据到表中。 * 整体实现过程/原理可分为以下3步: * 1. 用OBS的API把数据上传到 “YourOBSPathToWriteTmpData”。可在OBS中配置生命周期策略,定期删除这些临时数据。 * 2. 向DLI提交执行LoadData语句,从而把数据导入到DLI。详见导入数据。 * 3. 每隔1秒循环检查作业运行状态,直到作业结束。 */ String yourOBSPathToWriteTmpData = String.format("obs://your_obs_bucket_name/your/path/%s", UUID.randomUUID()); loadData(dliInfo, yourOBSPathToWriteTmpData); // 步骤三:提交SQL语句,执行查询并读取结果。 String selectSql = "SELECT * FROM demo_db.demo_tbl"; String jobId = queryData(dliInfo, selectSql); // 步骤四': 如有需要,用户也可以通过作业ID来获取结果。 queryDataByJobId(dliInfo, jobId); // 分页查询所有作业,用户可以使用该接口查询当前工程下的所有SQL作业信息 // 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#listSqlJobs(ListSqlJobsRequest)。 //详见ListSqlJobs listSqlJobs(dliInfo)。 /* * 其它场景: * 1. 如果用户想取消已经提交的SQL作业,可使用以下接口。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#cancelSqlJob(CancelSqlJobRequest),详见取消作业。 * 注:若作业已经执行结束或失败则无法取消。 * 2. 如果用户想对SQL语句做语法校验,可使用以下接口。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#checkSql(CheckSqlRequest),详见检查SQL语法。 * 注:本接口只能做语法校验,无法做语义校验。请使用Explain语句,提交到DLI执行,进行语义校验。 * 3. 如果用户想根据jobId获取某个已提交的SQL作业,并查看作业详情,可使用以下接口。 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobDetail(ShowSqlJobDetailRequest),详见查询作业详细信息。 * 4. 获取作业执行进度信息,如果作业正在执行,可以获取到子作业的信息,如果作业刚开始或者已经结束,则无法获取到子作业信息 * 关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobProgress(ShowSqlJobProgressRequest),详见查询作业执行进度信息。 */ } catch (DLIException e) { // 请根据业务实际情况处理异常信息,此处仅作样例。 } } |
创建数据库和表
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
private static String prepare(DLIInfo dliInfo) throws DLIException { // 1. 创建数据库。 // “default”为内置数据库,不能创建名为“default”的数据库。 String createDbSql = "CREATE DATABASE IF NOT EXISTS demo_db"; new SQLJob(dliInfo, createDbSql).submit(); // 2. 创建表。注:根据实际情况调整表结构和表数据目录,以及OBS存储路径!!! String createTblSql = "CREATE TABLE IF NOT EXISTS `demo_tbl` (\n" + " `bool_c` BOOLEAN,\n" + " `byte_c` TINYINT,\n" + " `short_c` SMALLINT,\n" + " `int_c` INT,\n" + " `long_c` BIGINT,\n" + " `float_c` FLOAT,\n" + " `double_c` DOUBLE,\n" + " `dec_c` DECIMAL(10,2),\n" + " `str_c` STRING,\n" + " `date_c` DATE,\n" + " `ts_c` TIMESTAMP,\n" + " `binary_c` BINARY,\n" + " `arr_c` ARRAY<INT>,\n" + " `map_c` MAP<STRING, INT>,\n" + " `struct_c` STRUCT<`s_str_c`: STRING, `s_bool_c`: BOOLEAN>)\n" + "USING parquet OPTIONS(path 'obs://demo_bucket/demo_db/demo_tbl')"; new SQLJob(dliInfo, "demo_db", createTblSql).submit(); } |
导入数据
- DLI SDK导入数据实现过程可分为以下3步:
- 使用OBS的API把数据上传到临时的OBS目录下,即 “YourOBSPathToWriteTmpData”。
- 向DLI提交执行LoadData语句,把数据从临时的OBS目录导入到DLI。导入数据
- 循环检查作业运行状态,间隔1秒,直至作业结束。
- 导入数据说明:
- 在提交导入数据的作业前,可选择配置导入数据的分区、配置是否是覆盖写入数据(默认追加写入数据)。
- 如需将数据插入某些具体的分区, 则可以使用以下构造器:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", new LinkedHashMap<String,String>(){{put("YourPartitionColumnName","value");}});
- 导入作业默认是追加写入,如需覆盖写入数据,则可以使用以下构造器:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", true);
- 如需导入覆盖写分区数据,则可以使用以下构造器:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", new LinkedHashMap<String,String>(){{put("YourPartitionColumnName","value");}}, true);
- 如需将数据插入某些具体的分区, 则可以使用以下构造器:
- 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
- 在提交导入数据的作业前,可选择配置导入数据的分区、配置是否是覆盖写入数据(默认追加写入数据)。
- 相关链接:
- 关键SDK API:
- com.huawei.dli.sdk.write.Writer、
- com.huawei.dli.sdk.Job#asyncSubmit()
- com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobStatus
- 示例代码:
- (推荐)方案1:使用LoadData语句执行数据导入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
private static void loadData(DLIInfo dliInfo, String uploadDataPath) throws DLIException { UploadJob uploadJob = new UploadJob( dliInfo, uploadDataPath, new SqlJobBase.TableInfo("demo_db", "demo_tbl")); // 1. 写入数据到OBS临时目录。请根据业务实际情况做调整,此处仅作样例。 // 注:此步骤为直接调用OBS写数据相关API,DLI仅提供了一个默认写Json的实现,即文件在OBS上的保存格式为Json。 // 此处Writer可依据业务需求自定义实现,比如,用户可使用自定义的 csv writer,则文件在OBS上的保存格式为csv。 writeTmpData(uploadJob.createWriter(), genSchema(), 123, 50); // 2. 将数据导入到DLI。 // 提交LoadData语句执行。 // 注:此处的data_type需要根据第一步中的Writer实现来确定,DLI默认提供的为JSON。如用户使用自定义的Writer,则需要修改成响应的 data_type String loadSql = "LOAD DATA INPATH '" + uploadDataPath + "' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')"; SQLJob sqlJob = new SQLJob(dliInfo, loadSql); sqlJob.asyncSubmit(); // 3. 循环检查作业运行状态。 checkRunning(V3ClientUtils.getDliClient(dliInfo), sqlJob.getJobId()); }
- 方案2:使用DLI封装SDK提交数据导入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private static void loadData(DLIInfo dliInfo, String uploadDataPath) throws DLIException { UploadJob uploadJob = new UploadJob( dliInfo, uploadDataPath, new SqlJobBase.TableInfo("demo_db", "demo_tbl")); // 1. 写入数据到OBS临时目录。请根据业务实际情况做调整,此处仅作样例。 // 注:此步骤为直接调用OBS写数据相关API,DLI仅提供了一个默认写Json的实现,即文件在OBS上的保存格式为Json。 // 此处Writer可依据业务需求自定义实现,比如,用户可使用自定义的 csv writer,则文件在OBS上的保存格式为csv。 writeTmpData(uploadJob.createWriter(), genSchema(), 123, 50); // 2. 将数据导入到DLI。 // 使用DLI封装实现提交。注:因导入作业可能运行时间较长,因此使用asyncSubmit提交数据,并主动检查状态。 uploadJob.asyncSubmit(); // 3. 循环检查作业运行状态。 checkRunning(V3ClientUtils.getDliClient(dliInfo), uploadJob.getJobId()); }
- (推荐)方案1:使用LoadData语句执行数据导入
查询作业结果
- 相关链接:
- 关键SDK API:
com.huawei.dli.sdk.read.ResultSet,纯OBS读数据相关API调用,DLI提供了一个默认 OBS csv reader实现,可依据业务需求自定义实现。具体操作参考OBS API参考。
com.huawei.dli.sdk.SQLJob#submitQuery(),需要开启结果写作业桶特性,否则默认只预览前1000条数据。
您可以通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。
待作业运行结束后,如果result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private static String queryData(DLIInfo dliInfo, String selectSql) throws DLIException { SQLJob sqlJob = new SQLJob(dliInfo, selectSql); // 如有需要,您可在此处设置作业参数,比如:sqlJob.setConf() // 1. 提交查询作业到DLI,使用DLI封装实现提交并等待结果。 // 注1:此处需要根据SQL执行时长预取设置超时时间,默认5min。 // 注2:此处需要开启作业结果写作业桶特性,否则默认只预览前1000条数据。 sqlJob.setJobTimeout(30 * 60); ResultSet resultSet1 = null; try { resultSet1 = sqlJob.submitQuery(); handleResult(resultSet1); } finally { if (resultSet1 != null) { resultSet1.close(); } } return sqlJob.getJobId(); }
查询指定作业的结果
- 使用说明
com.huawei.dli.sdk.SQLJob#getResultSet(),OBS读数据相关API调用,DLI提供了一个OBS csv reader实现,可依据业务需求自定义实现。
使用本方法的前提是需要开启结果写作业桶特性。可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。待作业运行结束后,如果result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。
- 相关链接
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
private static void queryDataByJobId(DLIInfo dliInfo, String jobId) throws DLIException { // 检查该jobId对应的作业是否已运行结束,如未运行结束,则等待运行结束 SQLJob sqlJob = new SQLJob(dliInfo, null); sqlJob.setJobId(jobId); checkRunning(V3ClientUtils.getDliClient(dliInfo), jobId); // 根据jobId,获取结果数据的schema、结果数据在用户作业桶中的存储路径 ShowSqlJobStatusResponse resp = V3ClientUtils.getDliClient(dliInfo) .showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(jobId)); sqlJob.setJobStatus(resp.getStatus().getValue()); sqlJob.setResultSchema(SchemaUtils.getSchemaFromJson(resp.getDetail())); sqlJob.setResultPath(resp.getResultPath()); sqlJob.setResultCount(resp.getResultCount() != null ? resp.getResultCount() : 0); ResultSet resultSet = null; try { // 获取该jobId对应的查询结果, 并返回结果迭代器。 resultSet = sqlJob.getResultSet(); handleResult(resultSet); } finally { if (resultSet != null) { resultSet.close(); } } }
查询作业
- 使用说明
com.huaweicloud.sdk.dli.v1.DliClient#listSqlJobs(ListSqlJobsRequest) 查询所有作业
如果作业较多,必须使用下述分页查询的方式分批查询,否则只能返回第一页的内容,无法返回全部作业。
可通过设置 req.setStart() 和 req.setEnd() 查询指定时间段内的作业,单位毫秒。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private static void listSqlJobs(DLIInfo dliInfo) { DliClient dliClient = V3ClientUtils.getDliClient(dliInfo); ListSqlJobsRequest req = new ListSqlJobsRequest(); int currentPage = 1; req.setCurrentPage(currentPage); // 默认值 1 req.setPageSize(100); // 默认值 10 Integer jobCount = dliClient.listSqlJobs(req).getJobCount(); Integer cur = 0; // 分页查询 while (cur < jobCount) { ListSqlJobsResponse listSqlJobsResponse = dliClient.listSqlJobs(req); List<SqlJob> jobs = listSqlJobsResponse.getJobs(); for (SqlJob job : jobs) { // 在此处添加业务逻辑,对每个作业进行处理 cur++; if (cur.equals(jobCount)) { break; } } req.setCurrentPage(currentPage++); } }
写入数据到OBS writeTmpData
- 使用说明
您可以根据您自己的业务需求实现Writer接口并自定义相关写文件逻辑。
本例写入数据到OBS,调用OBS SDK能力,与DLI无关,当前UploadJob提供了一个默认写JSON的实现。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private static void writeTmpData(Writer writer, List<Column> schema, Integer totalRecords, Integer flushThreshold) throws DLIException { // 定义写数据到OBS的Writer,可以根据并发需要定义多个 // 根据目标表定义写入数据的列信息,详见本文的genSchema()方法和genSchema2()方法 // 根据实际业务定义循环大小 for (int i = 0; i < totalRecords; i++) { // 根据业务获取每一行的数据 List<Object> record = genRecord(); // 写入数据 Row row = new Row(schema); row.setRecord(record); writer.write(row); if (i % flushThreshold == 0) { // 写入一定数据量后及时刷新 writer.flush(); } } writer.close(); }
构建表的Schema
- 使用说明
请根据业务实际情况构造相应的Schema,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
private static List<Column> genSchema() { return Arrays.asList( new Column("bool_c", new PrimitiveType(DataType.TypeName.BOOLEAN), "boolean col"), new Column("byte_c", new PrimitiveType(DataType.TypeName.TINYINT), "tinyint col"), new Column("short_c", new PrimitiveType(DataType.TypeName.SMALLINT), "smallint col"), new Column("int_c", new PrimitiveType(DataType.TypeName.INT), "int col"), new Column("long_c", new PrimitiveType(DataType.TypeName.BIGINT), "bigint col"), new Column("float_c", new PrimitiveType(DataType.TypeName.FLOAT), "float col"), new Column("double_c", new PrimitiveType(DataType.TypeName.DOUBLE), "double col"), new Column( "dec_c", new PrimitiveType(DataType.TypeName.DECIMAL, Arrays.asList("10", "2")), "decimal col"), new Column("str_c", new PrimitiveType(DataType.TypeName.STRING), "string col"), new Column("date_c", new PrimitiveType(DataType.TypeName.DATE), "date col"), new Column("ts_c", new PrimitiveType(DataType.TypeName.TIMESTAMP), "timestamp col"), new Column("binary_c", new PrimitiveType(DataType.TypeName.BINARY), "binary col"), new Column( "arr_c", new ArrayType(new PrimitiveType(DataType.TypeName.INT)), "array col"), new Column( "map_c", new MapType( new PrimitiveType(DataType.TypeName.STRING), new PrimitiveType(DataType.TypeName.INT)), "map col"), new Column( "struct_c", new StructType(Arrays.asList( new Column("s_str_c", new PrimitiveType(DataType.TypeName.STRING), "struct string col"), new Column("s_bool_c", new PrimitiveType(DataType.TypeName.BOOLEAN), "struct boolean col"))), "struct col")); }
自动获取目标表的Schema
- 使用说明
自动获取目标表的schema。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13
private static List<Column> genSchema2(DLIInfo dliInfo, String yourDbName, String yourTableName) throws DLIException { String tempSql = String.format("select * from %s.%s limit 1", yourDbName, yourTableName); SQLJob sqlJob = new SQLJob(dliInfo, tempSql); sqlJob.submit(); ShowSqlJobStatusResponse resp = V3ClientUtils.getDliClient(dliInfo) .showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(sqlJob.getJobId())); if (!resp.getIsSuccess()) { throw new DLIException("Get sql job status failed, details: " + resp.getMessage()); } return SchemaUtils.getSchemaFromJson(resp.getDetail()); }
按需生成测试数据 List<Object> genRecord
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
private static List<Object> genRecord() { Map<String, Object> structData = new HashMap<>(); structData.put("s_str_c", "Abc"); structData.put("s_bool_c", true); return Arrays.asList( true, (byte) 1, (short) 123, 65535, 123456789012L, 101.235f, 256.012358, new BigDecimal("33.05"), "abc_123&", new Date(1683475200000L), new Timestamp(1683543480000L), "Hello".getBytes(StandardCharsets.UTF_8), Arrays.asList(1, 2, 3), Collections.singletonMap("k", 123), structData); }
查询作业状态
- 相关链接
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private static void checkRunning(DliClient dliClient, String jobId) throws DLIException { while (true) { ShowSqlJobStatusResponse resp; try { resp = dliClient.showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get job status by id: " + jobId, e); } String status = resp.getStatus().getValue(); logger.info(String.format("SparkSQL Job id %s status: %s", jobId, status)); if ("FINISHED".equals(status)) { return; } if ("FAILED".equals(status) || "CANCELLED".equals(status)) { throw new DLIException("Run job failed or cancelled, details: " + resp.getMessage()); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
处理作业结果
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
private static void handleResult(ResultSet resultSet) throws DLIException { while (resultSet.hasNext()) { Row row = resultSet.read(); List<Column> schema = row.getSchema(); List<Object> record = row.getRecord(); // 对每一行每一列进行处理 // 方法一:调用record.get(index)获取Object,然后根据各列的类型做类型转换 // 方法二:根据各列的类型调用row.getXXX(index)获取对应类型的数据 for (int i = 0; i < schema.size(); i++) { DataType.TypeName typeName = DataType.TypeName.fromName(schema.get(i).getType().getName()); switch (typeName) { case STRING: String strV = (String) record.get(i); System.out.println( "\t" + (strV == null ? null : strV.replaceAll("\r", "\\\\r").replaceAll("\n", "\\\\n"))); break; case DATE: Date dtV = (Date) record.get(i); System.out.println("\t" + (dtV == null ? null : DATE_FORMAT.get().format(dtV))); break; case TIMESTAMP: Timestamp tsV = (Timestamp) record.get(i); System.out.println("\t" + (tsV == null ? null : TIMESTAMP_FORMAT.get().format(tsV))); break; case BINARY: byte[] bytes = (byte[]) record.get(i); System.out.println("\t" + (bytes == null ? null : Base64.getEncoder().encodeToString(bytes))); break; case ARRAY: case MAP: case STRUCT: Object data = record.get(i); System.out.println("\t" + (data == null ? null : JsonUtils.toJSON(data))); break; default: System.out.println("\t" + record.get(i)); } } System.out.println(); } }