Submitting a Spark Job Using an SDK
This section describes how to submit a Spark job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.
For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.
Prerequisites
- You have configured the Python SDK environment by referring to Preparing a Python Development Environment.
- You have initialized the DLI client by referring to Initializing the DLI Client.
Preparations
Obtain an AK/SK, project ID, and region information.
- Log in to the management console.
- In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
- In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
- On the displayed page, click Download. Open the file to obtain the AK/SK information.
- In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.
Example Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# Configure logs. logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") project_id = "your_project_id" region_name = "region_name" credentials = BasicCredentials(your_access_key, your_secret_key, project_id) dli_client = DliClient.new_builder() \ .with_credentials(credentials) \ .with_region(DliRegion.value_of(region_name)) \ .build() try: # Step 1: Submit a Spark job to DLI for execution. job_id = run_spark_job(dli_client, "your_queue_name") # Step 2: If you wish to wait for the job execution to finish within the current thread, cycle through checking the status until the job completes. check_running(dli_client, job_id) # Step 3: To query one or more specific jobs based on conditions, use the following method: # This is just an example. In addition to jobId, you can also specify other filter criteria. list_spark_job(dli_client, job_id) # Other scenarios: # 1. If you want to cancel a running job, call the API below. # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.cancel_spark_job # Note: Batch processing jobs in the Successful or Failed state cannot be canceled. # 2. To query details about a specific job based on the job ID, perform the following operations: # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job except exceptions.ClientRequestException as e: # Handle the exception based on service requirements. The following is just an example. logger.error("Failed to execute job: ", e) |
Creating a Spark Job
- Function:
Execute Spark jobs.
- Reference link:
Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
def run_spark_job(client, queue_name): """ Submit a Spark job to DLI for execution. Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.create_spark_job. :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str queue_name: Queue name for the job to execute :return: str """ conf_map = { "spark.dli.metaAccess.enable": "true" # For example, "spark.dli.metaAccess.enable": "true" } request_body = CreateSparkJobRequestBody( # Set the parameters according to the actual situation. The following is just an example. queue=queue_name, # Name of a general queue spark_version="2.4.5", # Version of the Spark component used by the job. name="demo_spark_app", # User-defined job name, which is the batch processing job name specified by the user during job creation. The name can contain up to 128 characters. file="obs://your_bucket/your_spark_app.jar", # (Mandatory) Location of the JAR file on OBS. class_name="your_class_fullname", # (Mandatory) Class path of the main class ( --class), for example, org.example.DliCatalogTest. args=["YourAppArg1", "YourAppAgr2", "..."], # Input parameters of the application. Delete this line if it is not required. conf=conf_map, # Spark parameter (--conf) catalog_name="dli", # When accessing metadata, set this parameter to dli and ensure conf_map["spark.dli.metaAccess.enable"] = "true". jars=["YourDepJar1", "YourDepJar2", "..."], # Dependency JAR file (--jars). Delete this line if it is not required. feature="basic", # Job feature, which indicates the Spark image type used by the user job. It is typically set to basic. driver_cores=1, # Number of CPU cores for the driver in a Spark application driver_memory="1GB", # Memory for the driver in a Spark application. It can be set to values like 2G or 2048M. The value must include a unit; otherwise, the startup will fail. num_executors=1, # Number of executors in a Spark application executor_cores=1, # Number of CPU cores for each executor in a Spark application executor_memory="1GB" # Memory for executors in a Spark application. It can be set to values like 2G or 2048M. The value must include a unit; otherwise, the startup will fail. ) request = CreateSparkJobRequest(body=request_body) response = client.create_spark_job(request) return response.id
Querying the Status of a Batch Processing Job
- Function:
Execute Spark jobs.
- Reference link:
Key SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.show_spark_job_status
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
def check_running(client, job_id): """ If you wish to wait for the job execution to finish within the current thread, cycle through checking the status until the job completes. :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str job_id: The job id for getting status. :return: """ while True: try: request = ShowSparkJobStatusRequest(batch_id=job_id) response = client.show_spark_job_status(request) except exceptions.ClientRequestException as e: raise Exception(f"Failed to get job status by id: {job_id}") from e status = response.state logger.info("SparkJob id %s status: %s", job_id, status) if status == "success": return if status == "dead": raise Exception("Run job failed") time.sleep(1)
Querying the Batch Processing Job List
- Function:
Query the batch processing job list.
- Reference link:
Key SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.list_spark_jobs
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
def list_spark_job(client, job_id): """ :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param str job_id: The job id for getting details. :return: """ try: request = ListSparkJobsRequest( # This is just an example. In addition to jobId, you can also specify other filter criteria. job_id=job_id, queue_name="your_queue_name", # Change the value to a DLI queue name. You can then query batch processing jobs running on the queue (recommended). start=1716195600000, # Change the value to the start time of a user-defined job. Jobs whose start time is later than this specified time are queried. The time is in UNIX timestamp format, in milliseconds. end=1716199200000 # Change the value to the end time of a user-defined job. Jobs whose start time is earlier than this specified time are queried. The time is in UNIX timestamp format, in milliseconds. ) response = client.list_spark_jobs(request) except exceptions.ClientRequestException as e: raise Exception("Failed to list Spark jobs") from e logger.info("List SparkJobs: %s", response)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot