使用SDK提交Spark作业
本节操作介绍使用DLI SDK V2提交Spark作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。
对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。
前提条件
- 已参考Python开发环境配置配置Python SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") project_id = "your_project_id" region_name = "region_name" credentials = BasicCredentials(your_access_key, your_secret_key, project_id) dli_client = DliClient.new_builder() \ .with_credentials(credentials) \ .with_region(DliRegion.value_of(region_name)) \ .build() try: # 步骤一:提交Spark作业到DLI执行。 job_id = run_spark_job(dli_client, "your_queue_name") # 步骤二:如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。 check_running(dli_client, job_id) # 步骤三:如果您想根据条件查询一个或多个特定作业,可执行以下方法。 # 此处仅做样例,除了jobId,您还可指定其它筛选条件。详见ListSparkJobs list_spark_job(dli_client, job_id) # 其它场景: # 1. 作业运行期间,如果您想取消作业。可调用接口CancelSparkJob # 关键SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.cancel_spark_job # 注:作业状态为“已成功”或者“已失败”的批处理作业无法取消。 # 2. 如果您想根据jobId查询某个特定作业的详情,可执行以下方法。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job # 详见ShowSparkJob except exceptions.ClientRequestException as e: # 请根据业务实际情况处理异常信息,此处仅作样例。 logger.error("Failed to execute job: ", e) |
创建Spark作业
- 功能介绍:
用于执行Spark作业。
- 相关链接:
关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job
详见创建批处理作业。
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
def run_spark_job(client, queue_name): """ 提交Spark作业到DLI执行。 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job。详见CreateSparkJob :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str queue_name: Queue name for the job to execute :return: str """ conf_map = { "spark.dli.metaAccess.enable": "true" # 比如 "spark.dli.metaAccess.enable": "true" } request_body = CreateSparkJobRequestBody( # 请根据业务实际情况设置相应的参数,此处仅作样例。 queue=queue_name, # 填写通用队列的名称 spark_version="2.4.5", # 作业使用Spark组件的版本号。 name="demo_spark_app", # 用户自定义这个任务的名字,创建时用户指定的批处理名称,不能超过128个字符。 file="obs://your_bucket/your_spark_app.jar", # jar包在OBS上的位置。必选 class_name="your_class_fullname", # 必选。主类(--class)的类路径,比如 "org.example.DliCatalogTest" args=["YourAppArg1", "YourAppAgr2", "..."], # 应用程序参数的传入参数,如不需要请删除此行 conf=conf_map, # Spark参数(--conf) catalog_name="dli", # 访问元数据时,需要将该参数配置为dli。且需 conf_map["spark.dli.metaAccess.enable"] = "true" jars=["YourDepJar1", "YourDepJar2", "..."], # 依赖的jar包(--jars),如不需要请删除此行 feature="basic", # 作业特性,表示用户作业使用的Spark镜像类型。 一般选择basic即可。 driver_cores=1, # Spark应用Driver的CPU核数 driver_memory="1GB", # Spark应用的Driver内存,参数配置例如2G, 2048M。使用时必须带单位,否则会启动失败。 num_executors=1, # Spark应用Executor的个数 executor_cores=1, # Spark应用每个Executor的CPU核数 executor_memory="1GB" # Spark应用的Executor内存,参数配置例如2G, 2048M。使用时必须带单位,否则会启动失败 ) request = CreateSparkJobRequest(body=request_body) response = client.create_spark_job(request) return response.id
查询批处理作业状态
- 功能介绍:
用于执行Spark作业。
- 相关链接:
关键SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job_status
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
def check_running(client, job_id): """ 如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。 :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str job_id: The job id for getting status. :return: """ while True: try: request = ShowSparkJobStatusRequest(batch_id=job_id) response = client.show_spark_job_status(request) except exceptions.ClientRequestException as e: raise Exception(f"Failed to get job status by id: {job_id}") from e status = response.state logger.info("SparkJob id %s status: %s", job_id, status) if status == "success": return if status == "dead": raise Exception("Run job failed") time.sleep(1)
查询批处理作业列表
- 功能介绍:
查询批处理作业列表。
- 相关链接:
关键SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.list_spark_jobs
- 示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
def list_spark_job(client, job_id): """ :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str job_id: The job id for getting details. :return: """ try: request = ListSparkJobsRequest( # 此处仅做样例,除了jobId,您还可指定其它筛选条件。 # 详见ListSparkJobs job_id=job_id, queue_name="your_queue_name", # 此处需修改成DLI队列名。根据队列查询批作业(推荐使用)。 start=1716195600000, # 此处需修改成用户自定义作业开始时间。用于查询开始时间在该时间点之后的作业。时间格式为unix时间戳,单位:毫秒。 end=1716199200000 # 此处需修改成用户自定义作业结束时间。用于查询开始时间在该时间点之前的作业。时间格式为unix时间戳,单位:毫秒。 ) response = client.list_spark_jobs(request) except exceptions.ClientRequestException as e: raise Exception("Failed to list Spark jobs") from e logger.info("List SparkJobs: %s", response) # resp中的响应参数详见ListSparkJobs