Submitting a SQL Job Using an SDK
This section describes how to submit a SQL job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.
For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.
Prerequisites
- You have configured the Java SDK environment by referring to Overview.
- You have initialized the DLI client by referring to Initializing the DLI Client.
Preparations
Obtain an AK/SK, project ID, and region information.
- Log in to the management console.
- In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
- In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
- On the displayed page, click Download. Open the file to obtain the AK/SK information.
- In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.
Example Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
private static final Logger logger = LoggerFactory.getLogger(SqlJobExample.class); private static final ThreadLocal<DateFormat> DATE_FORMAT = ThreadLocal.withInitial( () -> new SimpleDateFormat("yyyy-MM-dd")); private static final ThreadLocal<DateFormat> TIMESTAMP_FORMAT = ThreadLocal.withInitial( () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZZ")); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DLIInfo dliInfo = new DLIInfo("RegionName", yourAccessKey, yourSecretKey, "YouProjectId"); dliInfo.setQueueName("YourQueueName"); try { // Step 1: Create a database and a table. prepare(dliInfo); /* * Step 2: Import data to the table. * The overall implementation process/ principle can be divided into the following three steps: * 1. Use the OBS API to upload data to YourOBSPathToWriteTmpData. You can configure a lifecycle policy in OBS to periodically delete these temporary data. * 2. Submit the LoadData statement to DLI to import data to DLI. For details, see Importing Data. * 3. Cyclically check the job status every second until the job is complete. */ String yourOBSPathToWriteTmpData = String.format("obs://your_obs_bucket_name/your/path/%s", UUID.randomUUID()); loadData(dliInfo, yourOBSPathToWriteTmpData); // Step 3: Submit the SQL statement, execute the query, and read the result. String selectSql = "SELECT * FROM demo_db.demo_tbl"; String jobId = queryData(dliInfo, selectSql); // Step 4: If needed, you can also obtain the results by job ID. queryDataByJobId(dliInfo, jobId); // Query all jobs by page. You can use this API to query information of all SQL jobs within the current project. // Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#listSqlJobs(ListSqlJobsRequest). listSqlJobs(dliInfo). /* * Other scenarios: * 1. To cancel a submitted SQL job, use the following API. * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#cancelSqlJob(CancelSqlJobRequest). * Note: If a job has been completed or failed, it cannot be canceled. * 2. To verify the syntax of an SQL statement, use the following API. * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#checkSql(CheckSqlRequest). * Note: This API can only be used to verify the syntax, not the semantics. Use the Explain statement and submit it to DLI for execution to perform semantic verification. * 3. To obtain a submitted SQL job based on the job ID and view job details, use the following API. * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobDetail(ShowSqlJobDetailRequest). * 4. Obtain the job execution progress. If the job is being executed, you can obtain the sub-job information. If the job has just started or has been completed, you cannot obtain the sub-job information. * Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobProgress(ShowSqlJobProgressRequest). */ } catch (DLIException e) { // Handle the exception based on service requirements. The following is just an example. } } |
Creating a Database and Table
- Creating a Database
- Creating a Table
- Key SDK API: com.huawei.dli.sdk.Job#submit()
Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
private static String prepare(DLIInfo dliInfo) throws DLIException { // 1. Create a database. // default is the database built in DLI. You cannot create a database named default. String createDbSql = "CREATE DATABASE IF NOT EXISTS demo_db"; new SQLJob(dliInfo, createDbSql).submit(); // 2. Create a table. Note: Adjust the table structure, table data directory, and OBS storage path based on site requirements. String createTblSql = "CREATE TABLE IF NOT EXISTS `demo_tbl` (\n" + " `bool_c` BOOLEAN,\n" + " `byte_c` TINYINT,\n" + " `short_c` SMALLINT,\n" + " `int_c` INT,\n" + " `long_c` BIGINT,\n" + " `float_c` FLOAT,\n" + " `double_c` DOUBLE,\n" + " `dec_c` DECIMAL(10,2),\n" + " `str_c` STRING,\n" + " `date_c` DATE,\n" + " `ts_c` TIMESTAMP,\n" + " `binary_c` BINARY,\n" + " `arr_c` ARRAY<INT>,\n" + " `map_c` MAP<STRING, INT>,\n" + " `struct_c` STRUCT<`s_str_c`: STRING, `s_bool_c`: BOOLEAN>)\n" + "USING parquet OPTIONS(path 'obs://demo_bucket/demo_db/demo_tbl')"; new SQLJob(dliInfo, "demo_db", createTblSql).submit(); } |
Importing Data
- To import data using a DLI SDK, follow these 3 steps:
- Use the OBS API to upload data to a temporary OBS directory, that is, YourOBSPathToWriteTmpData.
- Submit the LoadData statement to DLI to import data from the temporary OBS directory to DLI. Importing Data
- Cyclically check the job status every 1 second until the job is complete.
- Data import description:
- Before submitting a data import job, you can configure the partition for importing data and specify whether to overwrite existing data (data is appended by default).
- To insert data into a specific partition, use the following constructor:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", new LinkedHashMap<String,String>(){{put("YourPartitionColumnName","value");}});
- By default, an import job appends data to the existing data. To overwrite the existing data with the written data, use the following constructor:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", true);
- To overwrite the partition data, use the following constructor:
new SqlJobBase.TableInfo("demo_db", "demo_tbl", new LinkedHashMap<String,String>(){{put("YourPartitionColumnName","value");}}, true);
- To insert data into a specific partition, use the following constructor:
- If a folder and a file with the same name exist in an OBS bucket directory, data is preferentially loaded to the file rather than the folder at that path. You are advised not to have files and folders with the same name at the same level when creating an OBS object.
- Before submitting a data import job, you can configure the partition for importing data and specify whether to overwrite existing data (data is appended by default).
- Reference links:
- Key SDK APIs:
- com.huawei.dli.sdk.write.Writer
- com.huawei.dli.sdk.Job#asyncSubmit()
- com.huaweicloud.sdk.dli.v1.DliClient#showSqlJobStatus
- Sample code:
- (Recommended) Solution 1: Use the LoadData statement to import data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
private static void loadData(DLIInfo dliInfo, String uploadDataPath) throws DLIException { UploadJob uploadJob = new UploadJob( dliInfo, uploadDataPath, new SqlJobBase.TableInfo("demo_db", "demo_tbl")); // 1. Write data to the OBS temporary directory. Modify the following information based on site requirements. The following is just an example. // Note: This step involves directly calling the OBS data writing API. DLI only provides a default implementation for writing data in JSON format, meaning that files are stored on OBS in JSON format. // The writer here can be implemented based on service requirements. For example, you use a custom CSV writer, resulting in files being stored in CSV format on OBS. writeTmpData(uploadJob.createWriter(), genSchema(), 123, 50); // 2. Import data to DLI. // Submit the LoadData statement. // Note: The data_type here needs to be determined based on the writer implementation in step 1. By default, DLI provides JSON. If a custom writer is used, it should be modified to match the corresponding data_type. String loadSql = "LOAD DATA INPATH '" + uploadDataPath + "' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')"; SQLJob sqlJob = new SQLJob(dliInfo, loadSql); sqlJob.asyncSubmit(); // 3. Cyclically check the job status. checkRunning(V3ClientUtils.getDliClient(dliInfo), sqlJob.getJobId()); }
- Solution 2: Use the DLI encapsulated SDK to submit data for import.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private static void loadData(DLIInfo dliInfo, String uploadDataPath) throws DLIException { UploadJob uploadJob = new UploadJob( dliInfo, uploadDataPath, new SqlJobBase.TableInfo("demo_db", "demo_tbl")); // 1. Write data to the OBS temporary directory. Modify the following information based on site requirements. The following is just an example. // Note: This step involves directly calling the OBS data writing API. DLI only provides a default implementation for writing data in JSON format, meaning that files are stored on OBS in JSON format. // The writer here can be implemented based on service requirements. For example, you use a custom CSV writer, resulting in files being stored in CSV format on OBS. writeTmpData(uploadJob.createWriter(), genSchema(), 123, 50); // 2. Import data to DLI. // Submit data using DLI encapsulation. Note: Since the import job may take a long time to run, use asyncSubmit to submit data and proactively check the status. uploadJob.asyncSubmit(); // 3. Cyclically check the job status. checkRunning(V3ClientUtils.getDliClient(dliInfo), uploadJob.getJobId()); }
- (Recommended) Solution 1: Use the LoadData statement to import data.
Querying Job Results
- Reference link:
- Key SDK API:
com.huawei.dli.sdk.read.ResultSet: API calls related to reading data from OBS. DLI provides a default OBS CSV reader implementation, which can be customized based on service requirements.
com.huawei.dli.sdk.SQLJob#submitQuery(): The feature of writing results to the job bucket must be enabled; otherwise, only the first 1,000 data records are previewed by default.
You can determine if the feature is enabled by checking the result_path parameter in the response body of the API for querying job status.
After the job is completed, if result_path starts with obs://, the feature of writing job results to the job bucket is enabled; otherwise, it is not enabled.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private static String queryData(DLIInfo dliInfo, String selectSql) throws DLIException { SQLJob sqlJob = new SQLJob(dliInfo, selectSql); // If needed, you can set job parameters here, such as sqlJob.setConf(). // 1. Submit a query job to DLI, implement submission using DLI encapsulation, and await the result. // Note 1: Set the timeout duration here based on the SQL execution duration, with a default of 5 minutes. // Note 2: The feature of writing job results to the job bucket needs to be enabled here. Otherwise, only the first 1,000 data records are previewed by default. sqlJob.setJobTimeout(30 * 60); ResultSet resultSet1 = null; try { resultSet1 = sqlJob.submitQuery(); handleResult(resultSet1); } finally { if (resultSet1 != null) { resultSet1.close(); } } return sqlJob.getJobId(); }
Querying the Result of a Specified Job
- Instructions
com.huawei.dli.sdk.SQLJob#getResultSet(): API calls related to reading data from OBS. DLI provides an OBS CSV reader implementation, which can be customized based on service requirements.
To use this method, you need to enable the feature of writing job results to the job bucket. You can determine if the feature is enabled by checking the result_path parameter in the response body of the API for querying job status. After the job is completed, if result_path starts with obs://, the feature of writing job results to the job bucket is enabled; otherwise, it is not enabled.
- Reference links:
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
private static void queryDataByJobId(DLIInfo dliInfo, String jobId) throws DLIException { // Check whether the job corresponding to the job ID has completed. If not, wait until the job completes. SQLJob sqlJob = new SQLJob(dliInfo, null); sqlJob.setJobId(jobId); checkRunning(V3ClientUtils.getDliClient(dliInfo), jobId); // Retrieve the schema of the result data and the storage path of the result data in the user's job bucket based on the job ID. ShowSqlJobStatusResponse resp = V3ClientUtils.getDliClient(dliInfo) .showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(jobId)); sqlJob.setJobStatus(resp.getStatus().getValue()); sqlJob.setResultSchema(SchemaUtils.getSchemaFromJson(resp.getDetail())); sqlJob.setResultPath(resp.getResultPath()); sqlJob.setResultCount(resp.getResultCount() != null ? resp.getResultCount() : 0); ResultSet resultSet = null; try { // Obtain the query result corresponding to the job ID and return the result iterator. resultSet = sqlJob.getResultSet(); handleResult(resultSet); } finally { if (resultSet != null) { resultSet.close(); } } }
Querying the Job List
- Instructions
com.huaweicloud.sdk.dli.v1.DliClient#listSqlJobs(ListSqlJobsRequest).
If there are a large number of jobs, you must use the following pagination query method to query jobs in batches. Otherwise, only the jobs on the first page are returned.
You can use req.setStart() and req.setEnd() to query jobs within a specified time period, with the unit being milliseconds.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private static void listSqlJobs(DLIInfo dliInfo) { DliClient dliClient = V3ClientUtils.getDliClient(dliInfo); ListSqlJobsRequest req = new ListSqlJobsRequest(); int currentPage = 1; req.setCurrentPage(currentPage); // The default value is 1. req.setPageSize(100); // The default value is 10. Integer jobCount = dliClient.listSqlJobs(req).getJobCount(); Integer cur = 0; // Query jobs by page. while (cur < jobCount) { ListSqlJobsResponse listSqlJobsResponse = dliClient.listSqlJobs(req); List<SqlJob> jobs = listSqlJobsResponse.getJobs(); for (SqlJob job : jobs) { // Add the service logic here to process each job. cur++; if (cur.equals(jobCount)) { break; } } req.setCurrentPage(currentPage++); } }
Writing Data to OBS by Running writeTmpData
- Instructions
You can implement the writer interface and customize the file writing logic based on your service requirements.
This example writes data to OBS by calling the OBS SDK. Currently, UploadJob offers an implementation where data is written in JSON format by default.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
private static void writeTmpData(Writer writer, List<Column> schema, Integer totalRecords, Integer flushThreshold) throws DLIException { // Define a writer for writing data to OBS. You can define multiple writers based on the concurrency needs. // Define the column information to be written based on the target table. For details, see the genSchema() and genSchema2() methods in this document. // Define the loop size based on the actual service. for (int i = 0; i < totalRecords; i++) { // Retrieve data for each row based on the service. List<Object> record = genRecord(); // Write data. Row row = new Row(schema); row.setRecord(record); writer.write(row); if (i % flushThreshold == 0) { // Refresh the data promptly after writing a certain amount of data. writer.flush(); } } writer.close(); }
Creating a Schema for a Table
- Instructions
Construct the schema based on the actual service. The following is just an example.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
private static List<Column> genSchema() { return Arrays.asList( new Column("bool_c", new PrimitiveType(DataType.TypeName.BOOLEAN), "boolean col"), new Column("byte_c", new PrimitiveType(DataType.TypeName.TINYINT), "tinyint col"), new Column("short_c", new PrimitiveType(DataType.TypeName.SMALLINT), "smallint col"), new Column("int_c", new PrimitiveType(DataType.TypeName.INT), "int col"), new Column("long_c", new PrimitiveType(DataType.TypeName.BIGINT), "bigint col"), new Column("float_c", new PrimitiveType(DataType.TypeName.FLOAT), "float col"), new Column("double_c", new PrimitiveType(DataType.TypeName.DOUBLE), "double col"), new Column( "dec_c", new PrimitiveType(DataType.TypeName.DECIMAL, Arrays.asList("10", "2")), "decimal col"), new Column("str_c", new PrimitiveType(DataType.TypeName.STRING), "string col"), new Column("date_c", new PrimitiveType(DataType.TypeName.DATE), "date col"), new Column("ts_c", new PrimitiveType(DataType.TypeName.TIMESTAMP), "timestamp col"), new Column("binary_c", new PrimitiveType(DataType.TypeName.BINARY), "binary col"), new Column( "arr_c", new ArrayType(new PrimitiveType(DataType.TypeName.INT)), "array col"), new Column( "map_c", new MapType( new PrimitiveType(DataType.TypeName.STRING), new PrimitiveType(DataType.TypeName.INT)), "map col"), new Column( "struct_c", new StructType(Arrays.asList( new Column("s_str_c", new PrimitiveType(DataType.TypeName.STRING), "struct string col"), new Column("s_bool_c", new PrimitiveType(DataType.TypeName.BOOLEAN), "struct boolean col"))), "struct col")); }
Automatically Obtaining the Schema of the Target Table
- Instructions
Automatically obtain the schema of the target table.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13
private static List<Column> genSchema2(DLIInfo dliInfo, String yourDbName, String yourTableName) throws DLIException { String tempSql = String.format("select * from %s.%s limit 1", yourDbName, yourTableName); SQLJob sqlJob = new SQLJob(dliInfo, tempSql); sqlJob.submit(); ShowSqlJobStatusResponse resp = V3ClientUtils.getDliClient(dliInfo) .showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(sqlJob.getJobId())); if (!resp.getIsSuccess()) { throw new DLIException("Get sql job status failed, details: " + resp.getMessage()); } return SchemaUtils.getSchemaFromJson(resp.getDetail()); }
Generating Test Data List<Object> genRecord on Demand
- Instructions
Construct each row of data based on service requirements. The following is just an example.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
private static List<Object> genRecord() { Map<String, Object> structData = new HashMap<>(); structData.put("s_str_c", "Abc"); structData.put("s_bool_c", true); return Arrays.asList( true, (byte) 1, (short) 123, 65535, 123456789012L, 101.235f, 256.012358, new BigDecimal("33.05"), "abc_123&", new Date(1683475200000L), new Timestamp(1683543480000L), "Hello".getBytes(StandardCharsets.UTF_8), Arrays.asList(1, 2, 3), Collections.singletonMap("k", 123), structData); }
Querying the Job Status
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
private static void checkRunning(DliClient dliClient, String jobId) throws DLIException { while (true) { ShowSqlJobStatusResponse resp; try { resp = dliClient.showSqlJobStatus(new ShowSqlJobStatusRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get job status by id: " + jobId, e); } String status = resp.getStatus().getValue(); logger.info(String.format("SparkSQL Job id %s status: %s", jobId, status)); if ("FINISHED".equals(status)) { return; } if ("FAILED".equals(status) || "CANCELLED".equals(status)) { throw new DLIException("Run job failed or cancelled, details: " + resp.getMessage()); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
Processing Job Results
- Instructions
Construct each row of data based on service requirements. The following is just an example.
- Sample code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
private static void handleResult(ResultSet resultSet) throws DLIException { while (resultSet.hasNext()) { Row row = resultSet.read(); List<Column> schema = row.getSchema(); List<Object> record = row.getRecord(); // Process each row and each column. // Method 1: Call record.get(index) to get the Object, then perform type conversion according to the type of each column. // Method 2: Based on the type of each column, call row.getXXX(index) to obtain data of the corresponding type. for (int i = 0; i < schema.size(); i++) { DataType.TypeName typeName = DataType.TypeName.fromName(schema.get(i).getType().getName()); switch (typeName) { case STRING: String strV = (String) record.get(i); System.out.println( "\t" + (strV == null ? null : strV.replaceAll("\r", "\\\\r").replaceAll("\n", "\\\\n"))); break; case DATE: Date dtV = (Date) record.get(i); System.out.println("\t" + (dtV == null ? null : DATE_FORMAT.get().format(dtV))); break; case TIMESTAMP: Timestamp tsV = (Timestamp) record.get(i); System.out.println("\t" + (tsV == null ? null : TIMESTAMP_FORMAT.get().format(tsV))); break; case BINARY: byte[] bytes = (byte[]) record.get(i); System.out.println("\t" + (bytes == null ? null : Base64.getEncoder().encodeToString(bytes))); break; case ARRAY: case MAP: case STRUCT: Object data = record.get(i); System.out.println("\t" + (data == null ? null : JsonUtils.toJSON(data))); break; default: System.out.println("\t" + record.get(i)); } } System.out.println(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot