更新时间:2024-03-06 GMT+08:00

Flink作业相关

前提条件

新建SQL作业

DLI提供新建Flink SQL作业的接口。您可以使用该接口新建Flink SQL作业并提交到DLI,示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private static void createSQLJob(DLIClient client) throws DLIException {
    SubmitFlinkSqlJobRequest body = new SubmitFlinkSqlJobRequest();
    body.name("job-name");
    body.runMode(SubmitFlinkSqlJobRequest.RunModeEnum.SHARED_CLUSTER);
    body.checkpointEnabled(false);
    body.checkpointMode(1);
    body.jobType(SubmitFlinkSqlJobRequest.JobTypeEnum.JOB);
    JobStatusResponse result = client.submitFlinkSqlJob(body);
    System.out.println(result);
}

新建自定义作业

DLI提供新建Flink自定义作业的接口。您可以使用该接口创建一个用户自定义作业,目前支持jar格式,运行在独享队列中。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private static void createFlinkJob(DLIClient client) throws DLIException {
     CreateFlinkJarJobRequest body = new CreateFlinkJarJobRequest();
     body.name("jar-job");
     body.cuNumber(2);
     body.managerCuNumber(1);
     body.parallelNumber(1);
     body.entrypoint("dli/WindowJoin.jar");
     JobStatusResponse result = client.createFlinkJarJob(body);
     System.out.println(result);
 }

更新SQL作业

DLI提供更新Flink SQL作业接口。您可以使用该接口更新Flink SQL作业,示例代码如下:

1
2
3
4
5
6
private static void updateSQLJob(DLIClient client) throws DLIException {
     UpdateFlinkSqlJobRequest body = new UpdateFlinkSqlJobRequest();
     body.name("update-job");
     JobUpdateResponse result = client.updateFlinkSqlJob(body,203L);
     System.out.println(result);
 }

更新自定义作业

DLI提供更新Flink自定义作业的接口。您可以使用该接口更新已经创建的自定义作业,目前仅支持Jar格式和运行在独享队列中。示例代码如下:

1
2
3
4
5
6
   private static void updateFlinkJob(DLIClient client) throws DLIException {
        UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest();
        body.name("update-job");
        JobUpdateResponse result = client.updateFlinkJarJob(body,202L);
        System.out.println(result);
    }

查询作业列表

DLI提供查询Flink作业列表的接口。您可以使用该接口查询作业列表。作业列表查询支持以下参数: name,status,show_detail,cursor,next,limit,order。本示例排序方式选择降序desc,将会列出作业id小于cursor的作业列表信息。示例代码如下:

1
2
3
4
private static void  QueryFlinkJobListResponse(DLIClient client) throws DLIException {
        QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null);
        System.out.println(result);
    }

查询作业详情

DLI提供查询Flink作业详情的接口。您可以使用该接口查询作业的详情。示例代码如下:

1
2
3
4
5
private static void  getFlinkJobDetail(DLIClient client) throws DLIException {
Long jobId = 203L;//作业ID
    GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId);
    System.out.println(result);
}

查询作业执行计划图

DLI提供查询Flink作业执行计划图的接口。您可以使用该接口查询作业的执行计划图。示例代码如下:

1
2
3
4
5
private static void  getFlinkJobExecuteGraph(DLIClient client) throws DLIException {
Long jobId = 203L;//作业ID
        FlinkJobExecutePlanResponse result = client.getFlinkJobExecuteGraph(jobId);
        System.out.println(result);
    }

查询作业监控信息

DLI提供查询Flink作业监控信息的接口。您可以使用该接口查询作业监控信息,支持同时查询多个作业监控信息。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void getMetrics(DLIClient client) throws DLIException{
    List < Long > job_ids = new ArrayList < > ();
    Long jobId = 6316L; //作业1ID
    Long jobId2 = 6945L; //作业2ID
    job_ids.add(jobId);
    job_ids.add(jobId2);
    GetFlinkJobsMetricsBody body = new GetFlinkJobsMetricsBody();
    body.jobIds(job_ids);
    QueryFlinkJobMetricsResponse result = client.getFlinkJobsMetrics(body);
    System.out.println(result);
}

查询作业APIG网关服务访问地址

DLI提供查询Flink作业APIG访问地址的接口。您可以使用该接口查询作业APIG网关服务访问地址。示例代码如下:

1
2
3
4
5
  private static void  getFlinkApigSinks(DLIClient client) throws DLIException {
        Long jobId = 59L;//作业1ID
        FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId);
        System.out.println(result);
    }

运行作业

DLI提供运行Flink作业的接口。宁可以使用该接口触发运行作业。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public static void runFlinkJob(DLIClient client) throws DLIException{
       RunFlinkJobRequest body = new RunFlinkJobRequest();
        List<Long> jobIds = new ArrayList<>();
        Long jobId = 59L;//作业1ID
        Long jobid2 = 192L;//作业2ID
        jobIds.add(jobId);
        jobIds.add(jobid2);
        body.resumeSavepoint(false);
        body.jobIds(jobIds);
        List<GlobalBatchResponse> result = client.runFlinkJob(body);
        System.out.println(result);
    }

停止作业

DLI提供停止Flink作业的接口。您可以使用该接口停止一个正在运行的Flink作业。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
   public static void stopFlinkJob(DLIClient client) throws DLIException{
        StopFlinkJobRequest body = new StopFlinkJobRequest();
        List<Long> jobIds = new ArrayList<>();
        Long jobId = 59L;//作业1ID
        Long jobid2 = 192L;//作业2ID
        jobIds.add(jobId);
        jobIds.add(jobid2);
        body.triggerSavepoint(false);
        body.jobIds(jobIds);
        List<GlobalBatchResponse> result = client.stopFlinkJob(body);
        System.out.println(result);
    }

批量删除作业

DLI提供批量删除Flink作业的接口。您可以使用该接口批量删除任何状态的Flink作业。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void deleteFlinkJob(DLIClient client) throws DLIException{
    DeleteJobInBatchRequest  body = new DeleteJobInBatchRequest ();
    List<Long> jobIds = new ArrayList<>();
    Long jobId = 202L;//作业1ID
    Long jobid2 = 203L;//作业2ID
    jobIds.add(jobId);
    jobIds.add(jobid2);
    body.jobIds(jobIds);
    List<GlobalBatchResponse>  result = client. deleteFlinkJobInBatch(body);
    System.out.println(result);
}