Flink作业相关
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化,参考队列相关完成队列创建等操作。
新建SQL作业
DLI提供新建Flink SQL作业的接口。您可以使用该接口新建Flink SQL作业并提交到DLI,示例代码如下:
1 2 3 4 5 6 7 8 9 10 |
private static void createSQLJob(DLIClient client) throws DLIException { SubmitFlinkSqlJobRequest body = new SubmitFlinkSqlJobRequest(); body.name("job-name"); body.runMode(SubmitFlinkSqlJobRequest.RunModeEnum.SHARED_CLUSTER); body.checkpointEnabled(false); body.checkpointMode(1); body.jobType(SubmitFlinkSqlJobRequest.JobTypeEnum.JOB); JobStatusResponse result = client.submitFlinkSqlJob(body); System.out.println(result); } |
新建自定义作业
DLI提供新建Flink自定义作业的接口。您可以使用该接口创建一个用户自定义作业,目前支持jar格式,运行在独享队列中。示例代码如下:
1 2 3 4 5 6 7 8 9 10 |
private static void createFlinkJob(DLIClient client) throws DLIException { CreateFlinkJarJobRequest body = new CreateFlinkJarJobRequest(); body.name("jar-job"); body.cuNumber(2); body.managerCuNumber(1); body.parallelNumber(1); body.entrypoint("dli/WindowJoin.jar"); JobStatusResponse result = client.createFlinkJarJob(body); System.out.println(result); } |
更新SQL作业
DLI提供更新Flink SQL作业接口。您可以使用该接口更新Flink SQL作业,示例代码如下:
1 2 3 4 5 6 |
private static void updateSQLJob(DLIClient client) throws DLIException { UpdateFlinkSqlJobRequest body = new UpdateFlinkSqlJobRequest(); body.name("update-job"); JobUpdateResponse result = client.updateFlinkSqlJob(body,203L); System.out.println(result); } |
更新自定义作业
DLI提供更新Flink自定义作业的接口。您可以使用该接口更新已经创建的自定义作业,目前仅支持Jar格式和运行在独享队列中。示例代码如下:
1 2 3 4 5 6 |
private static void updateFlinkJob(DLIClient client) throws DLIException { UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest(); body.name("update-job"); JobUpdateResponse result = client.updateFlinkJarJob(body,202L); System.out.println(result); } |
查询作业列表
DLI提供查询Flink作业列表的接口。您可以使用该接口查询作业列表。作业列表查询支持以下参数: name,status,show_detail,cursor,next,limit,order。本示例排序方式选择降序desc,将会列出作业id小于cursor的作业列表信息。示例代码如下:
1 2 3 4 |
private static void QueryFlinkJobListResponse(DLIClient client) throws DLIException { QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null); System.out.println(result); } |
查询作业详情
DLI提供查询Flink作业详情的接口。您可以使用该接口查询作业的详情。示例代码如下:
1 2 3 4 5 |
private static void getFlinkJobDetail(DLIClient client) throws DLIException { Long jobId = 203L;//作业ID GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId); System.out.println(result); } |
查询作业执行计划图
DLI提供查询Flink作业执行计划图的接口。您可以使用该接口查询作业的执行计划图。示例代码如下:
1 2 3 4 5 |
private static void getFlinkJobExecuteGraph(DLIClient client) throws DLIException { Long jobId = 203L;//作业ID FlinkJobExecutePlanResponse result = client.getFlinkJobExecuteGraph(jobId); System.out.println(result); } |
查询作业监控信息
DLI提供查询Flink作业监控信息的接口。您可以使用该接口查询作业监控信息,支持同时查询多个作业监控信息。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 |
public static void getMetrics(DLIClient client) throws DLIException{ List < Long > job_ids = new ArrayList < > (); Long jobId = 6316L; //作业1ID Long jobId2 = 6945L; //作业2ID job_ids.add(jobId); job_ids.add(jobId2); GetFlinkJobsMetricsBody body = new GetFlinkJobsMetricsBody(); body.jobIds(job_ids); QueryFlinkJobMetricsResponse result = client.getFlinkJobsMetrics(body); System.out.println(result); } |
查询作业APIG网关服务访问地址
DLI提供查询Flink作业APIG访问地址的接口。您可以使用该接口查询作业APIG网关服务访问地址。示例代码如下:
1 2 3 4 5 |
private static void getFlinkApigSinks(DLIClient client) throws DLIException { Long jobId = 59L;//作业1ID FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId); System.out.println(result); } |
运行作业
DLI提供运行Flink作业的接口。宁可以使用该接口触发运行作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
public static void runFlinkJob(DLIClient client) throws DLIException{ RunFlinkJobRequest body = new RunFlinkJobRequest(); List<Long> jobIds = new ArrayList<>(); Long jobId = 59L;//作业1ID Long jobid2 = 192L;//作业2ID jobIds.add(jobId); jobIds.add(jobid2); body.resumeSavepoint(false); body.jobIds(jobIds); List<GlobalBatchResponse> result = client.runFlinkJob(body); System.out.println(result); } |
停止作业
DLI提供停止Flink作业的接口。您可以使用该接口停止一个正在运行的Flink作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
public static void stopFlinkJob(DLIClient client) throws DLIException{ StopFlinkJobRequest body = new StopFlinkJobRequest(); List<Long> jobIds = new ArrayList<>(); Long jobId = 59L;//作业1ID Long jobid2 = 192L;//作业2ID jobIds.add(jobId); jobIds.add(jobid2); body.triggerSavepoint(false); body.jobIds(jobIds); List<GlobalBatchResponse> result = client.stopFlinkJob(body); System.out.println(result); } |
批量删除作业
DLI提供批量删除Flink作业的接口。您可以使用该接口批量删除任何状态的Flink作业。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 |
public static void deleteFlinkJob(DLIClient client) throws DLIException{ DeleteJobInBatchRequest body = new DeleteJobInBatchRequest (); List<Long> jobIds = new ArrayList<>(); Long jobId = 202L;//作业1ID Long jobid2 = 203L;//作业2ID jobIds.add(jobId); jobIds.add(jobid2); body.jobIds(jobIds); List<GlobalBatchResponse> result = client. deleteFlinkJobInBatch(body); System.out.println(result); } |