更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Flink SQL作业

本节操作介绍使用DLI SDK V2提交Flink SQL作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    private static final Logger logger = LoggerFactory.getLogger(FlinkSqlJobExample.class);
    public static void main(String[] args) {        
        String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK");
        String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK");
        DliClient dliClient = DliClient.newBuilder()
            .withRegion(DliRegion.valueOf("RegionName"))  
            .withCredential(new BasicCredentials()
                .withAk(yourAccessKey)
                .withSk(yourSecretKey)
                .withProjectId("YouProjectId"))
            .build();

        try {
            // 步骤一:创建Flink作业。作业状态将变成 “草稿”
            Long jobId = createFlinkSqlJob(dliClient, "YourQueueName");
            logger.info("jobId: " + jobId);

            // 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中”
            List<FlinkSuccessResponse> resps = batchRunFlinkJobs(dliClient, Arrays.asList(jobId));
            logger.info("Response: " + ArrayUtils.toString(resps));

            // 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
            checkRunning(dliClient, jobId);

        } catch (DLIException e) {
            // 请根据业务实际情况处理异常信息,此处仅作样例。
        }

    }

创建Flink SQL作业

  • 功能介绍:

    用于创建Flink SQL作业。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkSqlJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkSqlJobRequest)

    新建Flink SQL作业

    创建 Flink SQL作业,此时作业状态将变成 “草稿”。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    private static Long createFlinkSqlJob(DliClient client, String queueName) {
            // 请根据业务实际情况设置相应的参数,此处仅作样例。
            CreateFlinkSqlJobResponse resp = client.createFlinkSqlJob(new CreateFlinkSqlJobRequest()
                .withBody(new CreateFlinkSqlJobRequestBody()
                    .withName("demo_flink_sql") // 自定义作业名称。长度限制:1-57个字符
                    .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符
                    .withSqlBody("create table orders(\n"
                        + "  name string,\n"
                        + "  num INT\n"
                        + ") with (\n"
                        + "  'connector' = 'datagen',\n"
                        + "  'rows-per-second' = '1', \n"
                        + "  'fields.name.kind' = 'random', \n"
                        + "  'fields.name.length' = '5' \n"
                        + ");\n"
                        + "\n"
                        + "CREATE TABLE sink_table (\n"
                        + "   name string,\n"
                        + "   num INT\n"
                        + ") WITH (\n"
                        + "   'connector' = 'print'\n"
                        + ");\n"
                        + "\n"
                        + "INSERT into sink_table SELECT * from orders;")
                    // 自定义 Stream SQL语句,至少包含source, query, sink三个部分。长度限制:1024*1024个字符。
                    // SQL示例:自动生成随机source数据,并打印到控制台。
                    .withQueueName(queueName) // 队列名称。长度限制:0-128个字符
                    .withRunMode("exclusive_cluster") // 作业运行模式。只支持 exclusive_cluster 独享模式。
                    .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能
                    .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志  checkpoint数据
                    .withJobType("flink_opensource_sql_job") // 作业类型。建议选择: "flink_opensource_sql_job"
                    .withFlinkVersion("1.12") // 指定Flink版本
                ));
            return resp.getJob().getJobId();
        }
    

批量运行Flink作业

  • 功能介绍:

    批量运行Flink SQL作业。

  • 相关链接:

    关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#batchRunFlinkJobs(com.huaweicloud.sdk.dli.v1.model.BatchRunFlinkJobsRequest)

    批量运行Flink作业

    批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。

  • 示例代码
    1
    2
    3
    4
    5
    6
    private static List<FlinkSuccessResponse> batchRunFlinkJobs(DliClient client, List<Long> jobIds) {
            BatchRunFlinkJobsResponse batchRunFlinkJobsResponse = client.batchRunFlinkJobs(
                new BatchRunFlinkJobsRequest()
                    .withBody(new BatchRunFlinkJobsRequestBody().withJobIds(jobIds)));
            return batchRunFlinkJobsResponse.getBody();
        }
    

查询作业状态

  • 功能介绍:

    查询Flink SQL作业状态。

  • 相关链接:

    关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)}

    查询Flink作业详情

    如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static void checkRunning(DliClient client, Long jobId) throws DLIException {
            while (true) {
                ShowFlinkJobResponse resp;
                try {
                    resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId));
                } catch (Exception e) {
                    throw new DLIException("Failed to get Flink sql job status by id: " + jobId, e);
                }
                String status = resp.getJobDetail().getStatus();
                logger.info(String.format("FlinkSqlJob id %s status: %s", jobId, status));
                if ("job_running".equals(status)) {
                    return;
                }
                if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) {
                    throw new DLIException("Run Flink sql job failed: " + resp);
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new DLIException("Check job running interrupted.");
                }
            }
        }
    

相关文档