Help Center/ Data Lake Insight/ SDK Reference/ Java SDK/ SDKs Related to Flink Jobs
Updated on 2023-07-19 GMT+08:00

SDKs Related to Flink Jobs

Prerequisites

  • You have configured the Java SDK environment by following the instructions provided Instructions.
  • You have initialized the DLI Client by following the instructions provided in Initializing the DLI Client and created queues by following the instructions provided in Queue-Related SDKs.

Creating a SQL Job

DLI provides an API for creating a Flink streaming SQL job. You can use it to create a Flink streaming SQL job and submit it to DLI. Sample code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private static void createSQLJob(DLIClient client) throws DLIException {
    SubmitFlinkSqlJobRequest body = new SubmitFlinkSqlJobRequest();
    body.name("job-name");
    body.runMode(SubmitFlinkSqlJobRequest.RunModeEnum.SHARED_CLUSTER);
    body.checkpointEnabled(false);
    body.checkpointMode(1);
    body.jobType(SubmitFlinkSqlJobRequest.JobTypeEnum.JOB);
    JobStatusResponse result = client.submitFlinkSqlJob(body);
    System.out.println(result);
}

Customizing a Job

DLI provides an API for creating a user-defined Flink job. Currently, the job supports the JAR format and runs in dedicated queues. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private static void createFlinkJob(DLIClient client) throws DLIException {
     CreateFlinkJarJobRequest body = new CreateFlinkJarJobRequest();
     body.name("jar-job");
     body.cuNumber(2);
     body.managerCuNumber(1);
     body.parallelNumber(1);
     body.entrypoint("dli/WindowJoin.jar");
     JobStatusResponse result = client.createFlinkJarJob(body);
     System.out.println(result);
 }

Updating a SQL Job

DLI provides an API for updating Flink streaming SQL jobs. You can use it to update a Flink streaming SQL job. Sample code is as follows:

1
2
3
4
5
6
private static void updateSQLJob(DLIClient client) throws DLIException {
     UpdateFlinkSqlJobRequest body = new UpdateFlinkSqlJobRequest();
     body.name("update-job");
     JobUpdateResponse result = client.updateFlinkSqlJob(body,203L);
     System.out.println(result);
 }

Updating a Custom Job

DLI provides an API for updating user-defined Flink jobs. You can use it to update custom jobs, which currently support the JAR format and run in dedicated queues. The example code is as follows:

1
2
3
4
5
6
   private static void updateFlinkJob(DLIClient client) throws DLIException {
        UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest();
        body.name("update-job");
        JobUpdateResponse result = client.updateFlinkJarJob(body,202L);
        System.out.println(result);
    }

Querying the List of Jobs

DLI provides an API for querying the Flink job list. The following parameters are involved in this API: name, status, show_detail, cursor, next, limit, and order. In this example, the query results are displayed in descending order and information about the jobs whose IDs are less than the value of cursor is displayed. The example code is as follows:

1
2
3
4
private static void  QueryFlinkJobListResponse(DLIClient client) throws DLIException {
        QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null);
        System.out.println(result);
    }

Querying Job Details

DLI provides an API for querying Flink job details. The example code is as follows:

1
2
3
4
5
private static void  getFlinkJobDetail(DLIClient client) throws DLIException {
Long jobId = 203L; //Job ID
    GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId);
    System.out.println(result);
}

Querying the Job Execution Plan Diagram

DLI provides an API for querying the execution plan of a Flink job. The example code is as follows:

1
2
3
4
5
private static void  getFlinkJobExecuteGraph(DLIClient client) throws DLIException {
Long jobId = 203L; //Job ID
        FlinkJobExecutePlanResponse result = client.getFlinkJobExecuteGraph(jobId);
        System.out.println(result);
    }

Querying Job Monitoring Information

DLI provides an API for querying Flink job monitoring information. Monitoring information about multiple jobs can be queried at the same time. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void getMetrics(DLIClient client) throws DLIException{
    List < Long > job_ids = new ArrayList < > ();
    Long jobId = 6316L; //Job 1 ID
    Long jobId2 = 6945L; //Job 2 ID
    job_ids.add(jobId);
    job_ids.add(jobId2);
    GetFlinkJobsMetricsBody body = new GetFlinkJobsMetricsBody();
    body.jobIds(job_ids);
    QueryFlinkJobMetricsResponse result = client.getFlinkJobsMetrics(body);
    System.out.println(result);
}

Querying the APIG Address of a Job

DLI provides an API for querying the APIG access address of a Flink job. The example code is as follows:

1
2
3
4
5
  private static void  getFlinkApigSinks(DLIClient client) throws DLIException {
        Long jobId = 59L; //Job 1 ID
        FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId);
        System.out.println(result);
    }

Running a Job

DLI provides APIs for running Flink jobs. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public static void runFlinkJob(DLIClient client) throws DLIException{
       RunFlinkJobRequest body = new RunFlinkJobRequest();
        List<Long> jobIds = new ArrayList<>();
        Long jobId = 59L; //Job 1 ID
        Long jobId2 = 192L; //Job 2 ID
        jobIds.add(jobId);
        jobIds.add(jobid2);
        body.resumeSavepoint(false);
        body.jobIds(jobIds);
        List<GlobalBatchResponse> result = client.runFlinkJob(body);
        System.out.println(result);
    }

Stopping a Job

DLI provides an API for stopping Flink jobs. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
   public static void stopFlinkJob(DLIClient client) throws DLIException{
        StopFlinkJobRequest body = new StopFlinkJobRequest();
        List<Long> jobIds = new ArrayList<>();
        Long jobId = 59L; //Job 1 ID
        Long jobId2 = 192L; //Job 2 ID
        jobIds.add(jobId);
        jobIds.add(jobid2);
        body.triggerSavepoint(false);
        body.jobIds(jobIds);
        List<GlobalBatchResponse> result = client.stopFlinkJob(body);
        System.out.println(result);
    }

Deleting Jobs in Batches

DLI provides an API for deleting Flink jobs in batches. You can use the API to batch delete Flink jobs in any status. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void deleteFlinkJob(DLIClient client) throws DLIException{
    DeleteJobInBatchRequest  body = new DeleteJobInBatchRequest ();
    List<Long> jobIds = new ArrayList<>();
    Long jobId = 202L; //Job 1 ID
    Long jobid2 = 203L; //Job 2 ID
    jobIds.add(jobId);
    jobIds.add(jobid2);
    body.jobIds(jobIds);
    List<GlobalBatchResponse>  result = client. deleteFlinkJobInBatch(body);
    System.out.println(result);
}