Job-related SDKs
For details about the dependencies and complete sample code, see Instructions.
Importing Data
DLI provides an API for importing data. You can use this API to import OBS to a DLI table. Sample code is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def import_data(dli_client, db_name, tbl_name, queue_name): options = { "with_column_header": True, "delimiter": ",", "quote_char": "\"", "escape_char": "\\", "date_format": "yyyy/MM/dd", "timestamp_format": "yyyy/MM/dd hh:mm:ss" } try: job_id, status = \ dli_client.import_table(tbl_name, db_name, 'obs://bucket/obj/data.csv', 'csv', queue_name=queue_name, options=options) except DliException as e: print(e) return print(job_id) print(status) |
- Before submitting the importing job, you can specify the data_type parameter to set the type of the data to be imported. For example, set data_type to csv. Use the options parameter to set details about the CSV data format, such as the delimiter and escape character.
- If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.
Exporting Data
DLI provides an API for exporting data. You can use this API to export DLI table data to an OBS bucket. The example code is as follows:
1 2 3 4 5 6 7 8 9 10 11 |
def export_data(dli_client, db_name, tbl_name, queue_name): try: job_id, status = dli_client.export_table(tbl_name, db_name, 'obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(job_id) print(status) |
- Before submitting the export job, you can set the data format, compression type, and export mode. The data can only be exported in the CSV format.
- If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.
Submitting a Job
DLI provides an API for querying jobs. The example code is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def run_sql(dli_client, db_name, queue_name): # execute SQL try: sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name, queue_name=queue_name) result_set = sql_job.get_result(queue_name=queue_name) except DliException as e: print(e) return if result_set.row_count == 0: return for row in result_set: print(row) # export the query result to obs try: status = sql_job.export_result('obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(status) |
Canceling a Job
DLI provides an API for canceling jobs. You can use it to cancel a submitted job. A job that has been completed or failed cannot be canceled. The example code is as follows:
1 2 3 4 5 6 |
def cancel_sql(dli_client, job_id): try: dli_client.cancel_sql(job_id) except DliException as e: print(e) return |
Querying All Jobs
DLI provides an API for querying all jobs. You can use the API to query information about all jobs in the current project and obtain the query result. The example code is as follows:
1 2 3 4 5 6 7 8 |
def list_all_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return for sql_job in sql_jobs: print(sql_job) |
APIs in this SDK do not support SQL patterns. You cannot match SQL patterns for job query.
To query DLI jobs, use the Querying All Jobs API.
Querying SQL Jobs
def list_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.