使用SDK提交Flink Jar作业
本节操作介绍使用DLI SDK V2提交Flink Jar作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。
对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。
前提条件
- 已参考Python开发环境配置配置Python SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") project_id = "your_project_id" region_name = "region_name" credentials = BasicCredentials(your_access_key, your_secret_key, project_id) dli_client = DliClient.new_builder() \ .with_credentials(credentials) \ .with_region(DliRegion.value_of(region_name)) \ .build() try: # 步骤一:创建Flink作业。作业状态将变成 “草稿” job_id = create_flink_jar_job(dli_client, "your_queue_name") logger.info("job_id: %d", job_id) # 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中” resps = batch_run_flink_jar_jobs(dli_client, [job_id]) logger.info("Response: %s", resps) # 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。 check_running(dli_client, job_id) except exceptions.ClientRequestException as e: # 请根据业务实际情况处理异常信息,此处仅作样例。 logger.error("Failed to execute job:", e) |
创建Flink Jar作业
- 功能介绍:
用于创建Flink Jar作业。
- 相关链接:
关键SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_jar_job
创建 Flink Jar作业。作业状态将变成 “草稿”
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
def create_flink_jar_job(client, queue_name): """ 创建 Flink jar 作业。作业状态将变成 “草稿” :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client :param str queue_name: Queue name for the job to execute :return: int """ request_body = CreateFlinkJarJobRequestBody( name="your_job_name", # 作业名称,名字必须唯一,比如 flink_jar_job_demo。长度限制:1-57个字符。 desc="your flink job's description", # 用户自定义描述。长度限制:0-512个字符。 queue_name=queue_name, # 通用队列名称。队列名称。长度限制:0-128个字符。 feature="basic", # 作业特性。表示用户作业使用的Flink镜像类型。basic:表示使用DLI提供的基础Flink镜像。 flink_version="1.12", # Flink版本。当用户设置“feature”为“basic”时,该参数生效。用户可通过与“feature”参数配合使用,指定作业运行使用的DLI基础Flink镜像的版本 log_enabled=True, # 是否开启作业日志。 obs_bucket="your_obs_bucket_name", # 当“log_enabled”为“true”时, 用户授权保存作业日志的OBS桶名。 entrypoint="obs://your_obs_bucket_name/your/flink/job.jar", # 用户已上传到OBS的程序包,用户自定义作业主类所在的jar包。 main_class="your_class_fullname" # 作业入口类,比如:org.apache.flink.examples.WordCount ) request = CreateFlinkJarJobRequest(body=request_body) response = client.create_flink_jar_job(request) return response.job.job_id
批量运行Flink作业
- 功能介绍:
批量运行Flink SQL作业。
- 相关链接:
关键SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs
批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
def batch_run_flink_jar_jobs(client, job_ids): """ 运行作业。作业状态将从 “草稿” 变成 “提交中” 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs。详见 BatchRunFlinkJobs :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param list[int] job_ids: The job ids for running. :return: The body of this BatchRunFlinkJobsResponse. :rtype: list[:class:`huaweicloudsdkdli.v1.FlinkSuccessResponse`] """ request_body = BatchRunFlinkJobsRequestBody(job_ids=job_ids) request = BatchRunFlinkJobsRequest(body=request_body) response = client.batch_run_flink_jobs(request) return response.body
查询作业状态
- 功能介绍:
查询Flink SQL作业状态。
- 相关链接:
关键SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job
如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
def check_running(client, job_id): """ 如果您想在当前线程等待作业进入“运行中”状态,可执行此方法,循环检查状态,直到作业进入“运行中”状态。 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job。详见ShowFlinkJob :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param int job_id: The job id for getting status. :return: """ while True: try: request = ShowFlinkJobRequest(job_id=job_id) response = client.show_flink_job(request) except exceptions.ClientRequestException as e: raise Exception(f"Failed to get Flink jar job status by id: {job_id}") from e status = response.job_detail.status logger.info("FlinkJarJob id %d status: %s", job_id, status) if status == "job_running": return if status in ["job_submit_fail", "job_running_exception"]: raise Exception(f"Run Flink jar job failed: {response}") time.sleep(1)