dli
Flink作业相关
更新时间:2020/06/23 GMT+08:00
新建SQL作业
DLI提供新建Flink SQL作业的接口。您可以使用该接口新建Flink SQL作业并提交到DLI,示例代码如下:
1 2 3 4 5 6 7 8 9 10 | private static void createSQLJob(DLIClient client) throws DLIException {
SubmitFlinkSqlJobRequest body = new SubmitFlinkSqlJobRequest();
body.name("job-name");
body.runMode(SubmitFlinkSqlJobRequest.RunModeEnum.SHARED_CLUSTER);
body.checkpointEnabled(false);
body.checkpointMode(1);
body.jobType(SubmitFlinkSqlJobRequest.JobTypeEnum.JOB);
JobStatusResponse result = client.submitFlinkSqlJob(body);
System.out.println(result);
}
|
新建自定义作业
DLI提供新建Flink自定义作业的接口。您可以使用该接口创建一个用户自定义作业,目前支持jar格式,运行在独享队列中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 | private static void createFlinkJob(DLIClient client) throws DLIException {
CreateFlinkJarJobRequest body = new CreateFlinkJarJobRequest();
body.name("jar-job");
body.cuNumber(2);
body.managerCuNumber(1);
body.parallelNumber(1);
body.entrypoint("dli/WindowJoin.jar");
JobStatusResponse result = client.createFlinkJarJob(body);
System.out.println(result);
}
|
更新SQL作业
DLI提供更新Flink SQL作业接口。您可以使用该接口更新Flink SQL作业,示例代码如下:
1 2 3 4 5 6 | private static void updateSQLJob(DLIClient client) throws DLIException {
UpdateFlinkSqlJobRequest body = new UpdateFlinkSqlJobRequest();
body.name("update-job");
JobUpdateResponse result = client.updateFlinkSqlJob(body,203L);
System.out.println(result);
}
|
更新自定义作业
DLI提供更新Flink自定义作业的接口。您可以使用该接口更新已经创建的自定义作业,目前仅支持Jar格式和运行在独享队列中。示例代码如下:
1 2 3 4 5 6 | private static void updateFlinkJob(DLIClient client) throws DLIException {
UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest();
body.name("update-job");
JobUpdateResponse result = client.updateFlinkJarJob(body,202L);
System.out.println(result);
}
|
查询作业列表
DLI提供查询Flink作业列表的接口。您可以使用该接口查询作业列表。作业列表查询支持以下参数: name,status,show_detail,cursor,next,limit,order。本示例排序方式选择降序desc,将会列出作业id小于cursor的作业列表信息。示例代码如下:
1 2 3 4 | private static void QueryFlinkJobListResponse(DLIClient client) throws DLIException {
QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null);
System.out.println(result);
}
|
查询作业详情
DLI提供查询Flink作业详情的接口。您可以使用该接口查询作业的详情。示例代码如下:
1 2 3 4 5 | private static void getFlinkJobDetail(DLIClient client) throws DLIException {
Long jobId = 203L;//作业ID
GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId);
System.out.println(result);
}
|
查询作业执行计划图
DLI提供查询Flink作业执行计划图的接口。您可以使用该接口查询作业的执行计划图。示例代码如下:
1 2 3 4 5 | private static void getFlinkJobExecuteGraph(DLIClient client) throws DLIException {
Long jobId = 203L;//作业ID
FlinkJobExecutePlanResponse result = client.getFlinkJobExecuteGraph(jobId);
System.out.println(result);
}
|
查询作业监控信息
DLI提供查询Flink作业监控信息的接口。您可以使用该接口查询作业监控信息,支持同时查询多个作业监控信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 | public static void getMetrics(DLIClient client) throws DLIException{
List < Long > job_ids = new ArrayList < > ();
Long jobId = 6316L; //作业1ID
Long jobId2 = 6945L; //作业2ID
job_ids.add(jobId);
job_ids.add(jobId2);
GetFlinkJobsMetricsBody body = new GetFlinkJobsMetricsBody();
body.jobIds(job_ids);
QueryFlinkJobMetricsResponse result = client.getFlinkJobsMetrics(body);
System.out.println(result);
}
|
查询作业APIG网关服务访问地址
DLI提供查询Flink作业APIG访问地址的接口。您可以使用该接口查询作业APIG网关服务访问地址。示例代码如下:
1 2 3 4 5 | private static void getFlinkApigSinks(DLIClient client) throws DLIException {
Long jobId = 59L;//作业1ID
FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId);
System.out.println(result);
}
|
运行作业
DLI提供运行Flink作业的接口。宁可以使用该接口触发运行作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 | public static void runFlinkJob(DLIClient client) throws DLIException{
RunFlinkJobRequest body = new RunFlinkJobRequest();
List<Long> jobIds = new ArrayList<>();
Long jobId = 59L;//作业1ID
Long jobid2 = 192L;//作业2ID
jobIds.add(jobId);
jobIds.add(jobid2);
body.resumeSavepoint(false);
body.jobIds(jobIds);
List<GlobalBatchResponse> result = client.runFlinkJob(body);
System.out.println(result);
}
|
停止作业
DLI提供停止Flink作业的接口。您可以使用该接口停止一个正在运行的Flink作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 | public static void stopFlinkJob(DLIClient client) throws DLIException{
StopFlinkJobRequest body = new StopFlinkJobRequest();
List<Long> jobIds = new ArrayList<>();
Long jobId = 59L;//作业1ID
Long jobid2 = 192L;//作业2ID
jobIds.add(jobId);
jobIds.add(jobid2);
body.triggerSavepoint(false);
body.jobIds(jobIds);
List<GlobalBatchResponse> result = client.stopFlinkJob(body);
System.out.println(result);
}
|
批量删除作业
DLI提供批量删除Flink作业的接口。您可以使用该接口批量删除任何状态的Flink作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 | public static void deleteFlinkJob(DLIClient client) throws DLIException{
DeleteJobInBatchRequest body = new DeleteJobInBatchRequest ();
List<Long> jobIds = new ArrayList<>();
Long jobId = 202L;//作业1ID
Long jobid2 = 203L;//作业2ID
jobIds.add(jobId);
jobIds.add(jobid2);
body.jobIds(jobIds);
List<GlobalBatchResponse> result = client. deleteFlinkJobInBatch(body);
System.out.println(result);
}
|
父主题: Java SDK
