Updated on 2024-03-06 GMT+08:00

Job-related SDKs

For details about the dependencies and complete sample code, see Overview.

Importing Data

DLI provides an API for importing data. You can use this API to import OBS to a DLI table. Sample code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def import_data(dli_client, db_name, tbl_name, queue_name):
    options = {
        "with_column_header": True,
        "delimiter": ",",
        "quote_char": "\"",
        "escape_char": "\\",
        "date_format": "yyyy/MM/dd",
        "timestamp_format": "yyyy/MM/dd hh:mm:ss"
        }

    try:
        job_id, status = \
            dli_client.import_table(tbl_name, db_name,
                                    'obs://bucket/obj/data.csv',
                                    'csv', 
                                    queue_name=queue_name,
                                    options=options)
    except DliException as e:
        print(e)
        return

    print(job_id)
    print(status)
  • Before submitting the importing job, you can specify the data_type parameter to set the type of the data to be imported. For example, set data_type to csv. Use the options parameter to set details about the CSV data format, such as the delimiter and escape character.
  • If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.

Exporting Data

DLI provides an API for exporting data. You can use this API to export DLI table data to an OBS bucket. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def export_data(dli_client, db_name, tbl_name, queue_name):
    try:
        job_id, status = dli_client.export_table(tbl_name, db_name,
                                                 'obs://bucket/obj',
                                                 queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    print(job_id)
    print(status)
  • Before submitting the export job, you can set the data format, compression type, and export mode. The data can only be exported in the CSV format.
  • If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.

Submitting a Job

DLI provides an API for querying jobs. The example code is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def run_sql(dli_client, db_name, queue_name):
    # execute SQL
    try:
        sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name, queue_name=queue_name)
        result_set = sql_job.get_result(queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    if result_set.row_count == 0:
        return

    for row in result_set:
        print(row)

    # export the query result to obs
    try:
        status = sql_job.export_result('obs://bucket/obj',
                                       queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    print(status)

Canceling a Job

DLI provides an API for canceling jobs. You can use it to cancel a submitted job. A job that has been completed or failed cannot be canceled. The example code is as follows:

1
2
3
4
5
6
def cancel_sql(dli_client, job_id): 
    try: 
        dli_client.cancel_sql(job_id) 
    except DliException as e: 
        print(e) 
        return

Querying All Jobs

DLI provides an API for querying all jobs. You can use the API to query information about all jobs in the current project and obtain the query result. The example code is as follows:

1
2
3
4
5
6
7
8
def list_all_sql_jobs(dli_client): 
    try: 
        sql_jobs = dli_client.list_sql_jobs() 
    except DliException as e: 
        print(e) 
        return 
    for sql_job in sql_jobs: 
        print(sql_job)

APIs in this SDK do not support SQL patterns. You cannot match SQL patterns for job query.

To query DLI jobs, use the Querying All Jobs API.

Querying SQL Jobs

You can call an API to query information about all SQL jobs in the current project and obtain the query result. The sample code is as follows:
def list_sql_jobs(dli_client):
    try:
        sql_jobs = dli_client.list_sql_jobs()
    except DliException as e:
        print(e)
        return