更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Spark作业

本节操作介绍使用DLI SDK V2提交Spark作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def main():
    your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    project_id = "your_project_id"
    region_name = "region_name"  
    credentials = BasicCredentials(your_access_key, your_secret_key, project_id)
    dli_client = DliClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(DliRegion.value_of(region_name)) \
        .build()
    try:
        # 步骤一:提交Spark作业到DLI执行。
        job_id = run_spark_job(dli_client, "your_queue_name")
        # 步骤二:如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。
        check_running(dli_client, job_id)
        # 步骤三:如果您想根据条件查询一个或多个特定作业,可执行以下方法。
        # 此处仅做样例,除了jobId,您还可指定其它筛选条件。详见ListSparkJobs
        list_spark_job(dli_client, job_id)
        # 其它场景:
        # 1. 作业运行期间,如果您想取消作业。可调用接口CancelSparkJob
        # 关键SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.cancel_spark_job
        # 注:作业状态为“已成功”或者“已失败”的批处理作业无法取消。
        # 2. 如果您想根据jobId查询某个特定作业的详情,可执行以下方法。
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job
        # 详见ShowSparkJob 
    except exceptions.ClientRequestException as e:
        # 请根据业务实际情况处理异常信息,此处仅作样例。
        logger.error("Failed to execute job: ", e)

创建Spark作业

  • 功能介绍:

    用于执行Spark作业。

  • 相关链接:

    关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job

    详见创建批处理作业

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    def run_spark_job(client, queue_name):
        """
        提交Spark作业到DLI执行。
        关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job。详见CreateSparkJob
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param str queue_name: Queue name for the job to execute
        :return: str
        """
        conf_map = {
            "spark.dli.metaAccess.enable": "true"  # 比如 "spark.dli.metaAccess.enable": "true"
        }
        request_body = CreateSparkJobRequestBody(
            # 请根据业务实际情况设置相应的参数,此处仅作样例。
            queue=queue_name,  # 填写通用队列的名称
            spark_version="2.4.5",  # 作业使用Spark组件的版本号。
            name="demo_spark_app",  # 用户自定义这个任务的名字,创建时用户指定的批处理名称,不能超过128个字符。
            file="obs://your_bucket/your_spark_app.jar",  # jar包在OBS上的位置。必选
            class_name="your_class_fullname",  # 必选。主类(--class)的类路径,比如 "org.example.DliCatalogTest"
            args=["YourAppArg1", "YourAppAgr2", "..."],  # 应用程序参数的传入参数,如不需要请删除此行
            conf=conf_map,  # Spark参数(--conf)
            catalog_name="dli",  # 访问元数据时,需要将该参数配置为dli。且需 conf_map["spark.dli.metaAccess.enable"] = "true"
            jars=["YourDepJar1", "YourDepJar2", "..."],  # 依赖的jar包(--jars),如不需要请删除此行
            feature="basic",  # 作业特性,表示用户作业使用的Spark镜像类型。 一般选择basic即可。
            driver_cores=1,  # Spark应用Driver的CPU核数
            driver_memory="1GB",  # Spark应用的Driver内存,参数配置例如2G, 2048M。使用时必须带单位,否则会启动失败。
            num_executors=1,  # Spark应用Executor的个数
            executor_cores=1,  # Spark应用每个Executor的CPU核数
            executor_memory="1GB"  # Spark应用的Executor内存,参数配置例如2G, 2048M。使用时必须带单位,否则会启动失败
        )
        request = CreateSparkJobRequest(body=request_body)
        response = client.create_spark_job(request)
        return response.id
    

查询批处理作业状态

  • 功能介绍:

    用于执行Spark作业。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job_status

    查询批处理作业状态

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    def check_running(client, job_id):
        """
        如果您想在当前线程等待作业执行结束,可循环检查状态,直到作业结束。
        
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param str job_id: The job id for getting status.
        :return:
        """
        while True:
            try:
                request = ShowSparkJobStatusRequest(batch_id=job_id)
                response = client.show_spark_job_status(request)
            except exceptions.ClientRequestException as e:
                raise Exception(f"Failed to get job status by id: {job_id}") from e
            status = response.state
            logger.info("SparkJob id %s status: %s", job_id, status)
            if status == "success":
                return
            if status == "dead":
                raise Exception("Run job failed")
            time.sleep(1)
    

查询批处理作业列表

  • 功能介绍:

    查询批处理作业列表。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.list_spark_jobs

    查询批处理作业列表

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    def list_spark_job(client, job_id):
        """
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param str job_id: The job id for getting details.
        :return:
        """
        try:
            request = ListSparkJobsRequest(
                # 此处仅做样例,除了jobId,您还可指定其它筛选条件。
                # 详见ListSparkJobs
                job_id=job_id,
                queue_name="your_queue_name",  # 此处需修改成DLI队列名。根据队列查询批作业(推荐使用)。
                start=1716195600000,  # 此处需修改成用户自定义作业开始时间。用于查询开始时间在该时间点之后的作业。时间格式为unix时间戳,单位:毫秒。
                end=1716199200000  # 此处需修改成用户自定义作业结束时间。用于查询开始时间在该时间点之前的作业。时间格式为unix时间戳,单位:毫秒。
            )
            response = client.list_spark_jobs(request)
        except exceptions.ClientRequestException as e:
            raise Exception("Failed to list Spark jobs") from e
        logger.info("List SparkJobs: %s", response)
        # resp中的响应参数详见ListSparkJobs
    

相关文档