Help Center/ Data Lake Insight/ SDK Reference/ DLI SDK V2/ Python SDK (DLI SDK V2)/ Submitting a Flink SQL Job Using an SDK
Updated on 2025-06-19 GMT+08:00

Submitting a Flink SQL Job Using an SDK

This section describes how to submit a Flink SQL job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.

For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.

Prerequisites

Preparations

Obtain an AK/SK, project ID, and region information.

  1. Log in to the management console.
  2. In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
  3. In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
  4. On the displayed page, click Download. Open the file to obtain the AK/SK information.
  5. In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.

Example Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Configure logs.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def main():
    your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    project_id = "your_project_id"
    region_name = "region_name"  
    credentials = BasicCredentials(your_access_key, your_secret_key, project_id)
    dli_client = DliClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(DliRegion.value_of(region_name)) \
        .build()

    try:
        # Step 1: Create a Flink job. The job status changes to Draft.
        job_id = create_flink_sql_job(dli_client, "your_queue_name")
        logger.info("job_id: %d", job_id)

        # Step 2: Run the job. The job status changes from Draft to Submitting.
        resps = batch_run_flink_sql_jobs(dli_client, [job_id])
        logger.info("Response: %s", resps)

        # Step 3: Query the job status. If you wish to wait for the job to reach the Running state within the current thread, you can cyclically check the job status until it becomes Running.
        check_running(dli_client, job_id)

    except exceptions.ClientRequestException as e:
        # Handle the exception based on service requirements. The following is just an example.
        logger.error("Failed to execute job:", e)

Creating a Flink SQL Job

  • Function:

    Create a Flink SQL job.

  • Reference link:

    Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_sql_job.

    Create a Flink SQL job. The job status changes to Draft.

  • Sample code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    def create_flink_sql_job(client, queue_name):
        """
        Create a Flink job. The job status changes to Draft.
        Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_sql_job.
    
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param str queue_name: Queue name for the job to execute
        :return: int
        """
        request_body = CreateFlinkSqlJobRequestBody(
            name="your_job_name",  # Job name, which must be unique, for example, flink_jar_job_demo. The name can contain up to 57 characters.
            desc="your flink job's description",  # User-defined description. The description can contain up to 512 characters.
            sql_body="""create table orders(
                  name string,
                  num INT
                ) with (
                  'connector' = 'datagen',
                  'rows-per-second' = '1', 
                  'fields.name.kind' = 'random', 
                  'fields.name.length' = '5'
                );
                CREATE TABLE sink_table (
                   name string,
                   num INT
                ) WITH (
                   'connector' = 'print'
                );
                INSERT into sink_table SELECT * from orders;""",
            # Customize a stream SQL statement, which contains at least the following three parts: source, query, and sink. Length limit: 1024 x 1024 characters.
            # In this SQL statement, random source data is automatically generated and printed to the console.
            queue_name=queue_name,   # General queue name. The name can contain up to 128 characters.
            run_mode="exclusive_cluster",  # Job running mode. Only the exclusive_cluster mode is supported.
            log_enabled=True,  # Enable the function of uploading job logs to OBS buckets.
            obs_bucket="your_obs_bucket_name",  # OBS bucket name, which is used to store logs and checkpoints.
            job_type="flink_opensource_sql_job",  # Job type. You are advised to select flink_opensource_sql_job.
            flink_version="1.12" # Specify the Flink version.
        )
        request = CreateFlinkSqlJobRequest(body=request_body)
        response = client.create_flink_sql_job(request)
        return response.job.job_id
    

Running Flink Jobs in Batches

  • Function:

    Run Flink SQL jobs in batches.

  • Reference link:

    Key SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs.

    Run Flink jobs in batches. The job status changes from Draft to Submitting.

  • Sample code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def batch_run_flink_sql_jobs(client, job_ids):
        """
        Run jobs. The job status changes from Draft to Submitting.
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param list[int] job_ids: The job ids for running.
        :return: The body of this BatchRunFlinkJobsResponse.
        :rtype: list[:class:`huaweicloudsdkdli.v1.FlinkSuccessResponse`]
        """
        request_body = BatchRunFlinkJobsRequestBody(job_ids=job_ids)
        request = BatchRunFlinkJobsRequest(body=request_body)
        response = client.batch_run_flink_jobs(request)
        return response.body
    

Querying the Job Status

  • Function:

    Query the status of a Flink SQL job.

  • Reference link:

    Key SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job.

    If you wish to wait for the job's transition into the Running state within the current thread, you can cyclically check the job status until it becomes Running.

  • Sample code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    def check_running(client, job_id):
        """
        If you wish to wait for the job's transition into the Running state within the current thread, you can execute this method to cyclically check the job status until it becomes Running.
        
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param int job_id: The job id for getting status.
        :return:
        """
        while True:
            try:
                request = ShowFlinkJobRequest(job_id=job_id)
                response = client.show_flink_job(request)
            except exceptions.ClientRequestException as e:
                raise Exception(f"Failed to get Flink sql job status by id: {job_id}") from e
    
            status = response.job_detail.status
            logger.info("FlinkSqlJob id %d status: %s", job_id, status)
    
            if status == "job_running":
                return
            if status in ["job_submit_fail", "job_running_exception"]:
                raise Exception(f"Run Flink sql job failed: {response}")
    
            time.sleep(1)