更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Flink Jar作业

本节操作介绍使用DLI SDK V2提交Flink Jar作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final Logger logger = LoggerFactory.getLogger(FlinkJarJobExample.class);
public static void main(String[] args) {
        String yourAccessKey = System.getenv("HUAWEICLOUD_SDK_AK");
        String yourSecretKey = System.getenv("HUAWEICLOUD_SDK_SK");
        DliClient dliClient = DliClient.newBuilder()
            .withRegion(DliRegion.valueOf("RegionName"))  // 
            .withCredential(new BasicCredentials()
                .withAk(yourAccessKey)
                .withSk(yourSecretKey)
                .withProjectId("YouProjectId"))
            .build();
        try {
            // 步骤一:创建Flink作业。作业状态将变成 “草稿”
            Long jobId = createFlinkJarJob(dliClient, "YourQueueName");
            logger.info("jobId: " + jobId);
            // 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中”
            List<FlinkSuccessResponse> resps = batchRunFlinkJobs(dliClient, Arrays.asList(jobId));
            logger.info("Response: " + ArrayUtils.toString(resps));
            // 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
            checkRunning(dliClient, jobId);
        } catch (DLIException e) {
            // 请根据业务实际情况处理异常信息,此处仅作样例。
        }
    }

创建Flink Jar作业

  • 功能介绍:

    用于创建Flink Jar作业。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#createFlinkJarJob(com.huaweicloud.sdk.dli.v1.model.CreateFlinkJarJobRequest)

    新建Flink Jar作业

    创建 Flink Jar作业。作业状态将变成 “草稿”

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private static Long createFlinkJarJob(DliClient client, String queueName) {
            // 请根据业务实际情况设置相应的参数,此处仅作样例。
            CreateFlinkJarJobResponse resp = client.createFlinkJarJob(new CreateFlinkJarJobRequest()
                .withBody(new CreateFlinkJarJobRequestBody()
                    .withName("demo_flink_jar") // 自定义作业名称。长度限制:1-57个字符。
                    .withDesc("YourJobDescription") // 自定义作业描述。长度限制:0-512个字符
                    .withQueueName(queueName) // 队列名称。长度限制:0-128个字符
                    .withFeature("basic") // 作业特性。表示用户作业使用的Flink镜像类型basic:表示使用DLI提供的基础Flink镜像
                    .withFlinkVersion("1.12") // Flink版本。当用户设置“feature”为“basic”时,该参数生效
                    .withObsBucket("YourObsBucketName") // OBS桶名。用于保存 日志  checkpoint数据
                    .withLogEnabled(true) // 开启作业的日志上传到用户的OBS功能
                    .withEntrypoint("obs://YourObsBucketName/your/flink/job.jar") // 用户已上传到OBS的jar包,用户自定义作业主类所在的jar包
                    .withMainClass("YourClassFullName") // 作业入口类,比如:org.apache.flink.examples.JavaQueueStream
                    .withEntrypointArgs("YourAppArg1 YourAppAgr2") // 作业入口类参数,多个参数之间空格分隔。如不需要,删除此行
                    .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.jar",
                        "obs://YourObsBucketName/your/dependency2.jar")) // 用户已上传到OBS的jar包,用户自定义作业的其他依赖包。如不需要,删除此行
                    .withDependencyJars(Arrays.asList("obs://YourObsBucketName/your/dependency1.csv",
                        "obs://YourObsBucketName/your/dependency2.json")) // 用户已上传到OBS的文件,用户自定义作业的依赖文件。如不需要,删除此行
                ));
            return resp.getJob().getJobId();
        }
    

批量运行Flink作业

  • 功能介绍:

    批量运行Flink SQL作业。

  • 相关链接:

    关键SDK API:com.huaweicloud.sdk.dli.v1.DliClient#batchRunFlinkJobs(com.huaweicloud.sdk.dli.v1.model.BatchRunFlinkJobsRequest)

    批量运行Flink作业

    批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。

  • 示例代码
    1
    2
    3
    4
    5
    6
    private static List<FlinkSuccessResponse> batchRunFlinkJobs(DliClient client, List<Long> jobIds) {
            BatchRunFlinkJobsResponse batchRunFlinkJobsResponse = client.batchRunFlinkJobs(
                new BatchRunFlinkJobsRequest()
                    .withBody(new BatchRunFlinkJobsRequestBody().withJobIds(jobIds)));
            return batchRunFlinkJobsResponse.getBody();
        }
    

查询作业状态

  • 功能介绍:

    查询Flink SQL作业状态。

  • 相关链接:

    关键SDK API: com.huaweicloud.sdk.dli.v1.DliClient#showFlinkJob(com.huaweicloud.sdk.dli.v1.model.ShowFlinkJobRequest)

    查询Flink作业详情

    如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static void checkRunning(DliClient client, Long jobId) throws DLIException {
            while (true) {
                ShowFlinkJobResponse resp;
                try {
                    resp = client.showFlinkJob(new ShowFlinkJobRequest().withJobId(jobId));
                } catch (Exception e) {
                    throw new DLIException("Failed to get Flink jar job status by id: " + jobId, e);
                }
                String status = resp.getJobDetail().getStatus();
                logger.info(String.format("FlinkJarJob id %s status: %s", jobId, status));
                if ("job_running".equals(status)) {
                    return;
                }
                if ("job_submit_fail".equals(status) || "job_running_exception".equals(status)) {
                    throw new DLIException("Run Flink jar job failed: " + resp);
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new DLIException("Check job running interrupted.");
                }
            }
        }
    

相关文档