SDKs Related to Flink Jobs
Prerequisites
- You have configured the Java SDK environment by following the instructions provided Instructions.
- You have initialized the DLI Client by following the instructions provided in Initializing the DLI Client and created queues by following the instructions provided in Queue-Related SDKs.
Creating a SQL Job
DLI provides an API for creating a Flink streaming SQL job. You can use it to create a Flink streaming SQL job and submit it to DLI. Sample code is as follows:
| 1 2 3 4 5 6 7 8 9 10 | private static void createSQLJob(DLIClient client) throws DLIException { SubmitFlinkSqlJobRequest body = new SubmitFlinkSqlJobRequest(); body.name("job-name"); body.runMode(SubmitFlinkSqlJobRequest.RunModeEnum.SHARED_CLUSTER); body.checkpointEnabled(false); body.checkpointMode(1); body.jobType(SubmitFlinkSqlJobRequest.JobTypeEnum.JOB); JobStatusResponse result = client.submitFlinkSqlJob(body); System.out.println(result); } | 
Customizing a Job
DLI provides an API for creating a user-defined Flink job. Currently, the job supports the JAR format and runs in dedicated queues. The example code is as follows:
| 1 2 3 4 5 6 7 8 9 10 | private static void createFlinkJob(DLIClient client) throws DLIException { CreateFlinkJarJobRequest body = new CreateFlinkJarJobRequest(); body.name("jar-job"); body.cuNumber(2); body.managerCuNumber(1); body.parallelNumber(1); body.entrypoint("dli/WindowJoin.jar"); JobStatusResponse result = client.createFlinkJarJob(body); System.out.println(result); } | 
Updating a SQL Job
DLI provides an API for updating Flink streaming SQL jobs. You can use it to update a Flink streaming SQL job. Sample code is as follows:
| 1 2 3 4 5 6 | private static void updateSQLJob(DLIClient client) throws DLIException { UpdateFlinkSqlJobRequest body = new UpdateFlinkSqlJobRequest(); body.name("update-job"); JobUpdateResponse result = client.updateFlinkSqlJob(body,203L); System.out.println(result); } | 
Updating a Custom Job
DLI provides an API for updating user-defined Flink jobs. You can use it to update custom jobs, which currently support the JAR format and run in dedicated queues. The example code is as follows:
| 1 2 3 4 5 6 | private static void updateFlinkJob(DLIClient client) throws DLIException { UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest(); body.name("update-job"); JobUpdateResponse result = client.updateFlinkJarJob(body,202L); System.out.println(result); } | 
Querying the List of Jobs
DLI provides an API for querying the Flink job list. The following parameters are involved in this API: name, status, show_detail, cursor, next, limit, and order. In this example, the query results are displayed in descending order and information about the jobs whose IDs are less than the value of cursor is displayed. The example code is as follows:
| 1 2 3 4 | private static void QueryFlinkJobListResponse(DLIClient client) throws DLIException { QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null); System.out.println(result); } | 
Querying Job Details
DLI provides an API for querying Flink job details. The example code is as follows:
| 1 2 3 4 5 | private static void getFlinkJobDetail(DLIClient client) throws DLIException { Long jobId = 203L; //Job ID GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId); System.out.println(result); } | 
Querying the Job Execution Plan Diagram
DLI provides an API for querying the execution plan of a Flink job. The example code is as follows:
| 1 2 3 4 5 | private static void getFlinkJobExecuteGraph(DLIClient client) throws DLIException { Long jobId = 203L; //Job ID FlinkJobExecutePlanResponse result = client.getFlinkJobExecuteGraph(jobId); System.out.println(result); } | 
Querying Job Monitoring Information
DLI provides an API for querying Flink job monitoring information. Monitoring information about multiple jobs can be queried at the same time. The example code is as follows:
| 1 2 3 4 5 6 7 8 9 10 11 | public static void getMetrics(DLIClient client) throws DLIException{ List < Long > job_ids = new ArrayList < > (); Long jobId = 6316L; //Job 1 ID Long jobId2 = 6945L; //Job 2 ID job_ids.add(jobId); job_ids.add(jobId2); GetFlinkJobsMetricsBody body = new GetFlinkJobsMetricsBody(); body.jobIds(job_ids); QueryFlinkJobMetricsResponse result = client.getFlinkJobsMetrics(body); System.out.println(result); } | 
Querying the APIG Address of a Job
DLI provides an API for querying the APIG access address of a Flink job. The example code is as follows:
| 1 2 3 4 5 | private static void getFlinkApigSinks(DLIClient client) throws DLIException { Long jobId = 59L; //Job 1 ID FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId); System.out.println(result); } | 
Running a Job
DLI provides APIs for running Flink jobs. The example code is as follows:
| 1 2 3 4 5 6 7 8 9 10 11 12 | public static void runFlinkJob(DLIClient client) throws DLIException{ RunFlinkJobRequest body = new RunFlinkJobRequest(); List<Long> jobIds = new ArrayList<>(); Long jobId = 59L; //Job 1 ID Long jobId2 = 192L; //Job 2 ID jobIds.add(jobId); jobIds.add(jobid2); body.resumeSavepoint(false); body.jobIds(jobIds); List<GlobalBatchResponse> result = client.runFlinkJob(body); System.out.println(result); } | 
Stopping a Job
DLI provides an API for stopping Flink jobs. The example code is as follows:
| 1 2 3 4 5 6 7 8 9 10 11 12 | public static void stopFlinkJob(DLIClient client) throws DLIException{ StopFlinkJobRequest body = new StopFlinkJobRequest(); List<Long> jobIds = new ArrayList<>(); Long jobId = 59L; //Job 1 ID Long jobId2 = 192L; //Job 2 ID jobIds.add(jobId); jobIds.add(jobid2); body.triggerSavepoint(false); body.jobIds(jobIds); List<GlobalBatchResponse> result = client.stopFlinkJob(body); System.out.println(result); } | 
Deleting Jobs in Batches
DLI provides an API for deleting Flink jobs in batches. You can use the API to batch delete Flink jobs in any status. The example code is as follows:
| 1 2 3 4 5 6 7 8 9 10 11 | public static void deleteFlinkJob(DLIClient client) throws DLIException{ DeleteJobInBatchRequest body = new DeleteJobInBatchRequest (); List<Long> jobIds = new ArrayList<>(); Long jobId = 202L; //Job 1 ID Long jobid2 = 203L; //Job 2 ID jobIds.add(jobId); jobIds.add(jobid2); body.jobIds(jobIds); List<GlobalBatchResponse> result = client. deleteFlinkJobInBatch(body); System.out.println(result); } | 
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
 
    