更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Flink Jar作业

本节操作介绍使用DLI SDK V2提交Flink Jar作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def main():
    your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    project_id = "your_project_id"
    region_name = "region_name" 
    credentials = BasicCredentials(your_access_key, your_secret_key, project_id)
    dli_client = DliClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(DliRegion.value_of(region_name)) \
        .build()

    try:
        # 步骤一:创建Flink作业。作业状态将变成 “草稿”
        job_id = create_flink_jar_job(dli_client, "your_queue_name")
        logger.info("job_id: %d", job_id)

        # 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中”
        resps = batch_run_flink_jar_jobs(dli_client, [job_id])
        logger.info("Response: %s", resps)

        # 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
        check_running(dli_client, job_id)

    except exceptions.ClientRequestException as e:
        # 请根据业务实际情况处理异常信息,此处仅作样例。
        logger.error("Failed to execute job:", e)

创建Flink Jar作业

  • 功能介绍:

    用于创建Flink Jar作业。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_jar_job

    新建Flink Jar作业

    创建 Flink Jar作业。作业状态将变成 “草稿”

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    def create_flink_jar_job(client, queue_name):
        """
        创建 Flink jar 作业。作业状态将变成 “草稿”
        
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client
        :param str queue_name: Queue name for the job to execute
        :return: int
        """
        request_body = CreateFlinkJarJobRequestBody(
            name="your_job_name",  # 作业名称,名字必须唯一,比如 flink_jar_job_demo。长度限制:1-57个字符。
            desc="your flink job's description",  # 用户自定义描述。长度限制:0-512个字符。
            queue_name=queue_name,  # 通用队列名称。队列名称。长度限制:0-128个字符。
            feature="basic",  # 作业特性。表示用户作业使用的Flink镜像类型。basic:表示使用DLI提供的基础Flink镜像。
            flink_version="1.12",  # Flink版本。当用户设置“feature”为“basic”时,该参数生效。用户可通过与“feature”参数配合使用,指定作业运行使用的DLI基础Flink镜像的版本
            log_enabled=True,  # 是否开启作业日志。
            obs_bucket="your_obs_bucket_name",  # 当“log_enabled”为“true”时, 用户授权保存作业日志的OBS桶名。
            entrypoint="obs://your_obs_bucket_name/your/flink/job.jar",  # 用户已上传到OBS的程序包,用户自定义作业主类所在的jar包。
            main_class="your_class_fullname"  # 作业入口类,比如:org.apache.flink.examples.WordCount
        )
        request = CreateFlinkJarJobRequest(body=request_body)
        response = client.create_flink_jar_job(request)
        return response.job.job_id
    

批量运行Flink作业

  • 功能介绍:

    批量运行Flink SQL作业。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs

    批量运行Flink作业

    批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    def batch_run_flink_jar_jobs(client, job_ids):
        """
        运行作业。作业状态将从 “草稿” 变成 “提交中”
        关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs。详见 BatchRunFlinkJobs
    
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param list[int] job_ids: The job ids for running.
        :return: The body of this BatchRunFlinkJobsResponse.
        :rtype: list[:class:`huaweicloudsdkdli.v1.FlinkSuccessResponse`]
        """
        request_body = BatchRunFlinkJobsRequestBody(job_ids=job_ids)
        request = BatchRunFlinkJobsRequest(body=request_body)
        response = client.batch_run_flink_jobs(request)
        return response.body
    

查询作业状态

  • 功能介绍:

    查询Flink SQL作业状态。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job

    查询Flink作业详情

    如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    def check_running(client, job_id):
        """
        如果您想在当前线程等待作业进入“运行中”状态,可执行此方法,循环检查状态,直到作业进入“运行中”状态。
        关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job。详见ShowFlinkJob 
    
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param int job_id: The job id for getting status.
        :return:
        """
        while True:
            try:
                request = ShowFlinkJobRequest(job_id=job_id)
                response = client.show_flink_job(request)
            except exceptions.ClientRequestException as e:
                raise Exception(f"Failed to get Flink jar job status by id: {job_id}") from e
    
            status = response.job_detail.status
            logger.info("FlinkJarJob id %d status: %s", job_id, status)
    
            if status == "job_running":
                return
            if status in ["job_submit_fail", "job_running_exception"]:
                raise Exception(f"Run Flink jar job failed: {response}")
    
            time.sleep(1)
    

相关文档