使用SDK提交Flink Jar作业
本节操作介绍使用DLI SDK V2提交Flink Jar作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。
对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
private static final Logger logger = LoggerFactory.getLogger(FlinkJarJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) // .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // 步骤一:创建Flink作业。作业状态将变成 “草稿” Long jobId = createFlinkJarJob(dliClient, "YourQueueName"); logger.info("jobId: " + jobId); // 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中” List<FlinkSuccessResponse> resps = batchRunFlinkJobs(dliClient, Arrays.asList(jobId)); logger.info("Response: " + ArrayUtils.toString(resps)); // 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。 checkRunning(dliClient, jobId); } catch (DLIException e) { // 请根据业务实际情况处理异常信息,此处仅作样例。 } } |
创建Flink Jar作业
- 功能介绍:
用于创建Flink Jar作业。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkJarJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkJarJobRequest)
创建 Flink Jar作业。作业状态将变成 “草稿”
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
private static Long createFlinkJarJob(DliClient client, String queueName) { // 请根据业务实际情况设置相应的参数,此处仅作样例。 CreateFlinkJarJobResponse resp = client.createFlinkJarJob(new CreateFlinkJarJobRequest() .withBody(new CreateFlinkJarJobRequestBody() .withName("demo_flink_jar") // 自定义作业名称。长度限制:1-57个字符。 .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符 .withQueueName(queueName) // 队列名称。长度限制:0-128个字符 .withFeature("basic") // 作业特性。表示用户作业使用的Flink镜像类型。basic:表示使用DLI提供的基础Flink镜像。 .withFlinkVersion("1.12") // Flink版本。当用户设置“feature”为“basic”时,该参数生效 .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志 和 checkpoint数据 .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能 .withEntrypoint("obs://YourObsBucketName/your/flink/job.jar") // 用户已上传到OBS的jar包,用户自定义作业主类所在的jar包。 .withMainClass("YourClassFullName") // 作业入口类,比如:org.apache.flink.examples.JavaQueueStream .withEntrypointArgs("YourAppArg1 YourAppAgr2") // 作业入口类参数,多个参数之间空格分隔。如不需要,删除此行 .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.jar", "obs://YourObsBucketName/your/dependency2.jar")) // 用户已上传到OBS的jar包,用户自定义作业的其他依赖包。如不需要,删除此行 .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.csv", "obs://YourObsBucketName/your/dependency2.json")) // 用户已上传到OBS的文件,用户自定义作业的依赖文件。如不需要,删除此行 )); return resp.getJob().getJobId(); }
批量运行Flink作业
- 功能介绍:
批量运行Flink SQL作业。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#batchRunFlinkJobs(com.huaweicloud.sdk.dli.v1.model.BatchRunFlinkJobsRequest)
批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。
- 示例代码:
1 2 3 4 5 6
private static List<FlinkSuccessResponse> batchRunFlinkJobs(DliClient client, List<Long> jobIds) { BatchRunFlinkJobsResponse batchRunFlinkJobsResponse = client.batchRunFlinkJobs( new BatchRunFlinkJobsRequest() .withBody(new BatchRunFlinkJobsRequestBody().withJobIds(jobIds))); return batchRunFlinkJobsResponse.getBody(); }
查询作业状态
- 功能介绍:
查询Flink SQL作业状态。
- 相关链接:
关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)
如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
private static void checkRunning(DliClient client, Long jobId) throws DLIException { while (true) { ShowFlinkJobResponse resp; try { resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get Flink jar job status by id: " + jobId, e); } String status = resp.getJobDetail().getStatus(); logger.info(String.format("FlinkJarJob id %s status: %s", jobId, status)); if ("job_running".equals(status)) { return; } if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) { throw new DLIException("Run Flink jar job failed: " + resp); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }