Submitting a Flink Jar Job Using an SDK
This section describes how to submit a Flink Jar job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.
For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.
Prerequisites
- You have configured the Python SDK environment by referring to Preparing a Python Development Environment.
- You have initialized the DLI client by referring to Initializing the DLI Client.
Preparations
Obtain an AK/SK, project ID, and region information.
- Log in to the management console.
- In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
- In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
- On the displayed page, click Download. Open the file to obtain the AK/SK information.
- In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.
Example Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") project_id = "your_project_id" region_name = "region_name" credentials = BasicCredentials(your_access_key, your_secret_key, project_id) dli_client = DliClient.new_builder() \ .with_credentials(credentials) \ .with_region(DliRegion.value_of(region_name)) \ .build() try: # Step 1: Create a Flink job. The job status changes to Draft. job_id = create_flink_jar_job(dli_client, "your_queue_name") logger.info("job_id: %d", job_id) # Step 2: Run the job. The job status changes from Draft to Submitting. resps = batch_run_flink_jar_jobs(dli_client, [job_id]) logger.info("Response: %s", resps) # Step 3: Query the job status. If you wish to wait for the job to reach the Running state within the current thread, you can cyclically check the job status until it becomes Running. check_running(dli_client, job_id) except exceptions.ClientRequestException as e: # Handle the exception based on service requirements. The following is just an example. logger.error("Failed to execute job:", e) |
Creating a Flink Jar Job
- Function:
Create a Flink Jar job.
- Reference link:
Key SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_jar_job
Create a Flink Jar job. The job status changes to Draft.
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
def create_flink_jar_job(client, queue_name): """ Create a Flink Jar job. The job status changes to Draft. :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client :param str queue_name: Queue name for the job to execute :return: int """ request_body = CreateFlinkJarJobRequestBody( name="your_job_name", # Job name, which must be unique, for example, flink_jar_job_demo. The name can contain up to 57 characters. desc="your flink job's description", # User-defined description. The description can contain up to 512 characters. queue_name=queue_name, # General queue name. The name can contain up to 128 characters. feature="basic", # Job feature. Type of the Flink image used by a job. basic: the base Flink image provided by DLI is used. flink_version="1.12", # Flink version. This parameter takes effect when feature is set to basic. You can use this parameter together with feature to specify the version of the base Flink image used by the job. log_enabled=True, # Whether to enable job logs. obs_bucket="your_obs_bucket_name", # Name of the OBS bucket authorized by the user to store job logs when log_enabled is set to true. entrypoint="obs://your_obs_bucket_name/your/flink/job.jar", # Program package uploaded to OBS that stores the user-defined job main class. main_class="your_class_fullname" # Job entry class, for example, org.apache.flink.examples.WordCount. ) request = CreateFlinkJarJobRequest(body=request_body) response = client.create_flink_jar_job(request) return response.job.job_id
Running Flink Jobs in Batches
- Function:
Run Flink SQL jobs in batches.
- Reference link:
Key SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs
Run Flink jobs in batches. The job status changes from Draft to Submitting.
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
def batch_run_flink_jar_jobs(client, job_ids): """ Run jobs. The job status changes from Draft to Submitting. Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs. :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param list[int] job_ids: The job ids for running. :return: The body of this BatchRunFlinkJobsResponse. :rtype: list[:class:`huaweicloudsdkdli.v1.FlinkSuccessResponse`] """ request_body = BatchRunFlinkJobsRequestBody(job_ids=job_ids) request = BatchRunFlinkJobsRequest(body=request_body) response = client.batch_run_flink_jobs(request) return response.body
Querying the Job Status
- Function:
Query the status of a Flink SQL job.
- Reference link:
Key SDK API:
huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job
If you wish to wait for the job's transition into the Running state within the current thread, you can cyclically check the job status until it becomes Running.
- Sample code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
def check_running(client, job_id): """ If you wish to wait for the job's transition into the Running state within the current thread, you can execute this method to cyclically check the job status until it becomes Running. Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job. :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client. :param int job_id: The job id for getting status. :return: """ while True: try: request = ShowFlinkJobRequest(job_id=job_id) response = client.show_flink_job(request) except exceptions.ClientRequestException as e: raise Exception(f"Failed to get Flink jar job status by id: {job_id}") from e status = response.job_detail.status logger.info("FlinkJarJob id %d status: %s", job_id, status) if status == "job_running": return if status in ["job_submit_fail", "job_running_exception"]: raise Exception(f"Run Flink jar job failed: {response}") time.sleep(1)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot