使用SDK提交Flink SQL作业
本节操作介绍使用DLI SDK V2提交Flink SQL作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。
对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。
前提条件
- 已参考Java SDK概述配置Java SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
private static final Logger logger = LoggerFactory.getLogger(FlinkSqlJobExample.class); public static void main(String[] args) { String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK"); String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK"); DliClient dliClient = DliClient.newBuilder() .withRegion(DliRegion.valueOf("RegionName")) .withCredential(new BasicCredentials() .withAk(yourAccessKey) .withSk(yourSecretKey) .withProjectId("YouProjectId")) .build(); try { // 步骤一:创建Flink作业。作业状态将变成 “草稿” Long jobId = createFlinkSqlJob(dliClient, "YourQueueName"); logger.info("jobId: " + jobId); // 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中” List<FlinkSuccessResponse> resps = batchRunFlinkJobs(dliClient, Arrays.asList(jobId)); logger.info("Response: " + ArrayUtils.toString(resps)); // 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。 checkRunning(dliClient, jobId); } catch (DLIException e) { // 请根据业务实际情况处理异常信息,此处仅作样例。 } } |
创建Flink SQL作业
- 功能介绍:
用于创建Flink SQL作业。
- 相关链接:
关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkSqlJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkSqlJobRequest)
创建 Flink SQL作业,此时作业状态将变成 “草稿”。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
private static Long createFlinkSqlJob(DliClient client, String queueName) { // 请根据业务实际情况设置相应的参数,此处仅作样例。 CreateFlinkSqlJobResponse resp = client.createFlinkSqlJob(new CreateFlinkSqlJobRequest() .withBody(new CreateFlinkSqlJobRequestBody() .withName("demo_flink_sql") // 自定义作业名称。长度限制:1-57个字符 .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符 .withSqlBody("create table orders(\n" + " name string,\n" + " num INT\n" + ") with (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1', \n" + " 'fields.name.kind' = 'random', \n" + " 'fields.name.length' = '5' \n" + ");\n" + "\n" + "CREATE TABLE sink_table (\n" + " name string,\n" + " num INT\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ");\n" + "\n" + "INSERT into sink_table SELECT * from orders;") // 自定义 Stream SQL语句,至少包含source, query, sink三个部分。长度限制:1024*1024个字符。 // 本SQL示例:自动生成随机source数据,并打印到控制台。 .withQueueName(queueName) // 队列名称。长度限制:0-128个字符 .withRunMode("exclusive_cluster") // 作业运行模式。只支持 exclusive_cluster 独享模式。 .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能 .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志 和 checkpoint数据 .withJobType("flink_opensource_sql_job") // 作业类型。建议选择: "flink_opensource_sql_job" .withFlinkVersion("1.12") // 指定Flink版本 )); return resp.getJob().getJobId(); }
批量运行Flink作业
- 功能介绍:
批量运行Flink SQL作业。
- 相关链接:
关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#batchRunFlinkJobs(com.huaweicloud.sdk.dli.v1.model.BatchRunFlinkJobsRequest)
批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。
- 示例代码:
1 2 3 4 5 6
private static List<FlinkSuccessResponse> batchRunFlinkJobs(DliClient client, List<Long> jobIds) { BatchRunFlinkJobsResponse batchRunFlinkJobsResponse = client.batchRunFlinkJobs( new BatchRunFlinkJobsRequest() .withBody(new BatchRunFlinkJobsRequestBody().withJobIds(jobIds))); return batchRunFlinkJobsResponse.getBody(); }
查询作业状态
- 功能介绍:
查询Flink SQL作业状态。
- 相关链接:
关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)}
如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
private static void checkRunning(DliClient client, Long jobId) throws DLIException { while (true) { ShowFlinkJobResponse resp; try { resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId)); } catch (Exception e) { throw new DLIException("Failed to get Flink sql job status by id: " + jobId, e); } String status = resp.getJobDetail().getStatus(); logger.info(String.format("FlinkSqlJob id %s status: %s", jobId, status)); if ("job_running".equals(status)) { return; } if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) { throw new DLIException("Run Flink sql job failed: " + resp); } try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new DLIException("Check job running interrupted."); } } }