Updated on 2025-06-19 GMT+08:00

Submitting a SQL Job Using an SDK

This section describes how to submit a SQL job using DLI SDK V2.

Starting May 2024, new users can directly use DLI's SDK V2 without needing to have their accounts whitelisted.

For users who started using DLI before May 2024, to use this function, they must submit a service ticket to have their accounts whitelisted.

Prerequisites

Preparations

Obtain an AK/SK, project ID, and region information.

  1. Log in to the management console.
  2. In the upper right corner, hover over the username and choose My Credentials from the drop-down list.
  3. In the navigation pane on the left, choose Access Keys. On the displayed page, click Create Access Key. Confirm that you want to proceed with the operation and click OK.
  4. On the displayed page, click Download. Open the file to obtain the AK/SK information.
  5. In the navigation pane on the left, choose API Credentials. In the Projects pane, locate project_id and obtain the region information.

Example Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def main():
     your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    kwargs = {
        'region': 'region_name', 
        'project_id': 'your_project_id',
        'ak': your_access_key,
        'sk': your_secret_key
    }
    from dli.dli_client import DliClient
    dli_client = DliClient(**kwargs)

    try:
        # Step 1: Create a database and a table.
        queue_name = 'your_sql_queue_name'
        prepare(dli_client, queue_name)

        # Step 2: Import data to the table.
        # The overall implementation process/ principle can be divided into the following three steps:
        # 1. Use the OBS API to upload data to obs_path_to_write_tmp_data. You can configure a lifecycle policy in OBS to periodically delete these temporary data.
        # 2. Submit the Load Data statement to DLI to import OBS data to DLI.
        #    For details about the Load Data syntax, see Importing Data.
        # 3. Cyclically check the job status until the job is complete.
        obs_path_to_write_tmp_data = f"obs://your_obs_bucket_name/your/path/{uuid.uuid4()}"
        load_data(dli_client, obs_path_to_write_tmp_data, queue_name)

        # Step 3: Submit the SQL statement, execute the query, and read the result.
        select_sql = "SELECT * FROM demo_db.demo_tbl"
        job_id = query_data(dli_client, select_sql, queue_name)

        # Step 3: If needed, you can also obtain the results by job ID.
        query_data_by_jobid(dli_client, job_id)

        # Query all jobs by page. You can use this API to query information of all SQL jobs within the current project.
        # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs,
        list_sql_jobs(dli_client)

        # Other scenarios:
        # 1. To cancel a submitted SQL job, use the following API.
        # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.cancel_sql_job
        # Note: If a job has been completed or failed, it cannot be canceled.

        # 2. To verify the syntax of an SQL statement, use the following API.
        # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.check_sql
        # Note: This API can only be used to verify the syntax, not the semantics. Use the Explain statement and submit it to DLI for execution to perform semantic verification.

        # 3. To obtain a submitted SQL job based on the job ID and view job details, use the following API.
        # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_detail
        # 4. Obtain the job execution progress. If the job is being executed, you can obtain the sub-job information. If the job has just started or has been completed, you cannot obtain the sub-job information.
        # Key SDK API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_progress
    except DliException as e:
        # Handle the exception based on service requirements. The following is just an example.
        logger.error(e)

Creating a Database and Table

Reference links:

  • Creating a Database
  • Creating a Table
  • Key SDK: dli.dli_client.DliClient.execute_sql
  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job.
  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status.

Sample code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def prepare(dli_client, queue_name):
    """
    Create a database and a table.
    :param dli.dli_client.DliClient dli_client: DLI Client.
    :param str queue_name: Queue name for the SQL execute.
    :return: dli.job.SqlJob
    """
    # 1. Create a database.
    # default is the database built in DLI. You cannot create a database named default.
    createDbSql = "CREATE DATABASE IF NOT EXISTS demo_db"
    dli_client.execute_sql(sql=createDbSql, queue_name=queue_name) # Submit a SQL job and cyclically check the job status until the job is complete.

    # 2. Create a table. Note: Adjust the table structure, table data directory, and OBS storage path based on site requirements.
    createTblSql = "CREATE TABLE IF NOT EXISTS `demo_tbl` (" \
                   "  `bool_c` BOOLEAN," \
                   "  `byte_c` TINYINT," \
                   "  `short_c` SMALLINT," \
                   "  `int_c` INT," \
                   "  `long_c` BIGINT," \
                   "  `float_c` FLOAT," \
                   "  `double_c` DOUBLE," \
                   "  `decimal_c` DECIMAL(10,2)," \
                   "  `str_c` STRING," \
                   "  `date_c` DATE," \
                   "  `timestamp_c` TIMESTAMP," \
                   "  `binary_c` BINARY," \
                   "  `array_c` ARRAY<INT>," \
                   "  `map_c` MAP<STRING, INT>," \
                   "  `struct_c` STRUCT<`s_str_c`: STRING, `s_bool_c`: BOOLEAN>)" \
                   " LOCATION 'obs://demo_bucket/demo_db/demo_tbl'" \
                   " STORED as TEXTFILE"
    dli_client.execute_sql(sql=createTblSql, db_name='demo_db', queue_name=queue_name)

Importing Data

  • Reference links:
  • Importing Data
  • Native Data Types
  • Complex Data Types
  • Key SDK: dli.dli_client.DliClient.execute_sql
  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job.
  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status.
  • Sample code:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def load_data(dli_client, upload_data_path, queue_name):
       # 1. Write data to the OBS temporary directory. Modify the following information based on site requirements. The following is just an example.
        # Note: This step involves directly calling the OBS data writing API, entirely decoupled from DLI. This example only provides an implementation for writing data in JSON format, meaning that files are stored in JSON format on OBS.
        # You can customize the implementation based on service requirements. For example, you can save files as CSV files.
        write_tmp_data(get_schema(), upload_data_path, dli_client.dli_info, 1)
    
        # 2. Import the data written to OBS in step 1 to DLI.
        # Note: The data_type here needs to be determined based on the file type in step 1; in this example, it is JSON. If you use other formats, you should modify it to the corresponding data_type.
        loadSql = f"LOAD DATA INPATH '{upload_data_path}' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')"
        dli_client.execute_sql(sql=loadSql, queue_name=queue_name) # Submit a SQL job and cyclically check the job status until the job is complete.
    

Querying Job Results

  • Reference link:

    SELECT Query Statement

  • Key SDK: dli.dli_client.DliClient.execute_sql
  • Key SDK: dli.job.SqlJob.get_result. The feature of writing results to the job bucket must be enabled; otherwise, an exception is thrown.

    You can determine if the feature is enabled by checking the result_path parameter in the response body of the API for querying job status.

    After the job is completed, if result_path starts with obs://, the feature of writing job results to the job bucket is enabled; otherwise, it is not enabled.

  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job.
  • Key API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def query_data(dli_client, select_sql, queue_name):
        """
        :param dli.dli_client.DliClient dli_client: DLI Client.
        :param str select_sql: SQL statement
        :param str queue_name: Queue name for the SQL execute
        :return: str
        """
        sql_job = dli_client.execute_sql(sql=select_sql, queue_name=queue_name)
        print_result(sql_job.get_result())
        return sql_job.job_id
    

Querying the Result of a Specified Job

  • Instructions
    • Key SDK: dli.job.SqlJob.get_result.

      This method can be used only when the function of writing job results to the job bucket is enabled. Otherwise, an exception is thrown during job running.

      You can determine if the feature is enabled by checking the result_path parameter in the response body of the API for querying job status. After the job is completed, if result_path starts with obs://, the feature of writing job results to the job bucket is enabled; otherwise, it is not enabled.

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def query_data_by_jobid(dli_client, job_id):
        """
        :param dli.dli_client.DliClient dli_client: DLI Client.
        :param str job_id: job ID of a query job
        :return:
        """
        from dli.job import SqlJob
        sql_job = SqlJob(job_id=job_id, job_type="QUERY", client=dli_client)
        dli_client._cycle_check_sql_job(sql_job=sql_job)
        print_result(sql_job.get_result())
    

Querying the Job List

  • Instructions

    Query information about SQL jobs in the current project.

    If there are a large number of jobs, you must use the following pagination query method in this example to query jobs in batches. Otherwise, only the jobs on the first page are returned.

    Key SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    def list_sql_jobs(client):
        """
    Query information about SQL jobs in the current project. If there are a large number of jobs, you must use the pagination query method in this example to query jobs in batches. Otherwise, only the jobs on the first page are returned.
        Key SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs
    
        :param dli.dli_client.DliClient client: DLI Client.
        """
        req = ListSqlJobsRequest()
        req.current_page = 1 # The default value is 1.
        req.page_size = 100 # The default value is 10.
    
        # Obtain the total number of jobs.
        job_count = client.inner_client.list_sql_jobs(req).job_count
        cur = 0
    
        # Query jobs by page.
        while cur < job_count:
            list_sql_jobs_response = client.inner_client.list_sql_jobs(req)
            jobs = list_sql_jobs_response.jobs
            for job in jobs:
    
                # Add the service logic here to process each job.
                print(job)
    
                cur += 1
                if cur >= job_count:
                    break
            req.current_page += 1
    

Printing Job Results

  • Instructions

    Process each row of data based on service requirements. The following is just an example.

  • Sample code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    def print_result(obs_reader):
        """
        Process each row of data based on service requirements. The following is just an example.
        """
        count = 0
        for record in obs_reader:
            count += 1
            print(record)
        logger.info("total records: %d", count)
    

Writing Data to OBS by Running writeTmpData

  • Instructions

    This method involves directly calling the OBS data writing API, entirely decoupled from DLI APIs. This example provides an implementation for writing data in JSON format, meaning that files are stored in JSON format on OBS.

    You can customize the implementation based on service requirements. For example, you can save files as CSV files.

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    def write_tmp_data(schema, upload_data_path, dli_info, total_records):
        """
        This method involves directly calling the OBS data writing API, entirely decoupled from DLI APIs. This example provides an implementation for writing data in JSON format, meaning that files are stored in JSON format on OBS.
        You can customize the implementation based on service requirements. For example, you can save files as CSV files.
    
        :param list schema: Data structure schema
        :param str upload_data_path: This OBS temporary directory is used to store user service data.
        :param dli.dli_client.DliClient.dli_info dli_info: Authentication information passed during the initialization of the DLI client.
        :param int total_records: Number of simulated data rows. You can configure the number of data rows to be inserted using this parameter. This is for display purposes only and you can modify it based on service requirements.
        :return:
        """
        obs_client = ObsClient(access_key_id=dli_info.ak, secret_access_key=dli_info.sk, is_secure=True,
                               server=dli_info.obs)
        bucket_name = get_bucket_name(upload_data_path)
        object_prefix = get_object_prefix(upload_data_path)
    
        datas = ""
        try:
            for i in range(total_records):
                row = Row(schema=schema, columns=get_record())
                datas += to_json_string(row, schema)
            object_key = object_prefix + "/tmp_data.json"
            obs_client.putObject(bucket_name, object_key, datas)
            logger.info("Uploaded data to OBS bucket '%s' with object key '%s'", bucket_name, object_key)
        finally:
            obs_client.close()
    

Creating a Schema for a Table

  • Instructions

    Construct the schema based on the actual service. The following is just an example.

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    def get_schema():
        """
        Construct the schema based on the actual service. The following is just an example.
        """
        from dli.column import Column
        return [Column(name="bool_c", data_type="boolean", desc="boolean col"),
                Column(name="byte_c", data_type="tinyint", desc="tinyint col"),
                Column(name="short_c", data_type="smallint", desc="smallint col"),
                Column(name="int_c", data_type="int", desc="int col"),
                Column(name="long_c", data_type="bigint", desc="bigint col"),
                Column(name="float_c", data_type="float", desc="float col"),
                Column(name="double_c", data_type="double", desc="double col"),
                Column(name="decimal_c", data_type="decimal(10,2)", desc="decimal col"),
                Column(name="str_c", data_type="string", desc="string col"),
                Column(name="date_c", data_type="date", desc="date col"),
                Column(name="timestamp_c", data_type="timestamp", desc="timestamp col"),
                Column(name="binary_c", data_type="binary", desc="binary col"),
                Column(name="array_c", data_type="array<int>", desc="array col"),
                Column(name="map_c", data_type="map<string, int>", desc="map col"),
                Column(name="struct_c", data_type="struct<s_str_c:string,s_bool_c:boolean>", desc="struct col")]
    

Generating Test Data List<Object> genRecord on Demand

  • Instructions

    Construct each row of data based on service requirements. The following is just an example.

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def get_record():
        record = [
            True,  # boolean
            1,  # byte
            123,  # short
            65535,  # int
            123456789012,  # long
            101.235,  # float
            256.012358,  # double
            33.05,  # decimal
            "abc_123&",  # string
            "2023-05-08",  # date
            "1716345295000",  # Timestamp, in milliseconds
            base64.b64encode("hello".encode('utf-8')),  # binary
            [1, 2, 3],  # array
            {"k": 123},  # map
            {"s_str_c": "Abc", "s_bool_c": True}  # struct
        ]
        return record
    

to_json_string

  • Instructions

    Construct each row of data based on service requirements. The following is just an example.

  • Sample code
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def to_json_string(row, schema):
        json_obj = {}
        for i, column in enumerate(schema):
            if column.is_partition_column:
                continue
            if column.type == 'binary':
                json_obj[column.name] = base64.b64encode(row.columns[i]).decode('utf-8')
            elif column.type.startswith('decimal'):
                json_obj[column.name] = float(row.columns[i])
            else:
                json_obj[column.name] = row.columns[i]
        return json.dumps(json_obj) + "\n"
    

get_bucket_name

  • Instructions

    Construct each row of data based on service requirements. The following is just an example.

  • Sample code
    1
    2
    3
    4
    5
    6
    7
    def get_bucket_name(full_path):
        try:
            url = urlparse(full_path)
            return url.hostname
        except Exception as e:
            logger.error("Failed to get bucket name from full path", e)
            return None
    

get_object_prefix

  • Instructions

    Construct each row of data based on service requirements. The following is just an example.

  • Sample code
    1
    2
    3
    4
    5
    6
    7
    def get_object_prefix(full_path):
        try:
            url = urlparse(full_path)
            return url.path.lstrip('/')
        except Exception as e:
            logger.error("Failed to get object key from full path", e)
            return None