Submitting a Flink Jar Job Using an SDK
This section describes how to submit a Flink Jar job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.
For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.
Prerequisites
- You have configured the Java SDK environment by referring to Overview.
- You have initialized the DLI client by referring to Initializing the DLI Client.
Preparations
Obtain an AK/SK, project ID, and region information.
- Log in to the management console.
- In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
- In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
- On the displayed page, click Download. Open the file to obtain the AK/SK information.
- In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.
Example Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
private static final Logger logger = LoggerFactory.getLogger(FlinkJarJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) // .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // Step 1: Create a Flink job. The job status changes to Draft. Long jobId = createFlinkJarJob(dliClient, "YourQueueName"); logger.info("jobId: " + jobId); // Step 2: Run the job. The job status changes from Draft to Submitting. List<FlinkSuccessResponse> resps = batchRunFlinkJobs(dliClient, Arrays.asList(jobId)); logger.info("Response: " + ArrayUtils.toString(resps)); // Step 3: Query the job status. If you wish to wait for the job to reach the Running state within the current thread, you can cyclically check the job status until it becomes Running. checkRunning(dliClient, jobId); } catch (DLIException e) { // Handle the exception based on service requirements. The following is just an example. } } |
Creating a Flink Jar Job
- Function:
Create a Flink Jar job.
- Reference link:
Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#createFlinkJarJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkJarJobRequest)
Create a Flink Jar job. The job status changes to Draft.
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
private static Long createFlinkJarJob(DliClient client, String queueName) { // Set the parameters according to the actual situation. The following is just an example. CreateFlinkJarJobResponse resp = client.createFlinkJarJob(new CreateFlinkJarJobRequest() .withBody(new CreateFlinkJarJobRequestBody() .withName("demo_flink_jar") // Custom job name. The name can contain up to 57 characters. .withDesc("YourJobDescription") // Custom job description. The description can contain up to 512 characters. .withQueueName(queueName) // Queue name. The name can contain up to 128 characters. .withFeature("basic") // Job feature. Type of the Flink image used by a job. basic: the base Flink image provided by DLI is used. .withFlinkVersion("1.12") // Flink version. This parameter is valid only when feature is set to basic. .withObsBucket("YourObsBucketName") // OBS bucket name, which is used to store logs and checkpoint data. .withLogEnabled(true) // Enable the function of uploading job logs to OBS buckets. .withEntrypoint("obs://YourObsBucketName/your/flink/job.jar") // JAR file uploaded to OBS that includes the user-defined job main class. .withMainClass("YourClassFullName") // Job entry class, for example, org.apache.flink.examples.JavaQueueStream. .withEntrypointArgs("YourAppArg1 YourAppAgr2") // Job entry parameter. Multiple parameters are separated by spaces. Delete this line if it is not required. .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.jar", "obs://YourObsBucketName/your/dependency2.jar")) // JAR file uploaded to OBS that includes the dependencies of the user-defined job. Delete this line if it is not required. .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.csv", "obs://YourObsBucketName/your/dependency2.json")) // File uploaded to OBS that stores the dependencies of the user-defined job. Delete this line if it is not required. )); return resp.getJob().getJobId(); }
Running Flink Jobs in Batches
- Function:
Run Flink SQL jobs in batches.
- Reference link:
Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#batchRunFlinkJobs(com.huaweicloud.sdk.dli.v1.model.BatchRunFlinkJobsRequest)
Run Flink jobs in batches. The job status changes from Draft to Submitting.
- Sample code:
1 2 3 4 5 6
private static List<FlinkSuccessResponse> batchRunFlinkJobs(DliClient client, List<Long> jobIds) { BatchRunFlinkJobsResponse batchRunFlinkJobsResponse = client.batchRunFlinkJobs( new BatchRunFlinkJobsRequest() .withBody(new BatchRunFlinkJobsRequestBody().withJobIds(jobIds))); return batchRunFlinkJobsResponse.getBody(); }
Querying the Job Status
- Function:
Query the status of a Flink SQL job.
- Reference link:
Key SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)
If you wish to wait for the job's transition into the Running state within the current thread, you can cyclically check the job status until it becomes Running.
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
private static void checkRunning(DliClient client, Long jobId) throws DLIException { while (true) { ShowFlinkJobResponse resp; try { resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get Flink jar job status by id: " + jobId, e); } String status = resp.getJobDetail().getStatus(); logger.info(String.format("FlinkJarJob id %s status: %s", jobId, status)); if ("job_running".equals(status)) { return; } if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) { throw new DLIException("Run Flink jar job failed: " + resp); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot