Job-related SDKs
Importing Data
DLI provides an API for importing data. Sample code is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | def import_data(dli_client, db_name, tbl_name, queue_name):
options = {
"with_column_header": True,
"delimiter": ",",
"quote_char": "\"",
"escape_char": "\\",
"date_format": "yyyy/MM/dd",
"timestamp_format": "yyyy/MM/dd hh:mm:ss"
}
try:
job_id, status = \
dli_client.import_table(tbl_name, db_name,
'obs://bucket/obj/data.csv',
'csv',
queue_name=queue_name,
options=options)
except DliException as e:
print(e)
return
print(job_id)
print(status)
|
- Before submitting the importing job, you can specify the data_type parameter to set the type of the data to be imported. For example, set data_type to csv. Use the options parameter to set details about the CSV data format, such as the delimiter and escape character.
- If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.
Exporting Data
DLI provides an API for exporting data. The example code is as follows:
1 2 3 4 5 6 7 8 9 10 11 | def export_data(dli_client, db_name, tbl_name, queue_name):
try:
job_id, status = dli_client.export_table(tbl_name, db_name,
'obs://bucket/obj',
queue_name=queue_name)
except DliException as e:
print(e)
return
print(job_id)
print(status)
|
- Before submitting the export job, you can set the data format, compression type, and export mode. The data can only be exported in the CSV format.
- If a folder and a file under an OBS bucket directory have the same name, data is preferentially loaded to the file, instead of the folder. It is recommended that the files and folders of the same level have different names when you create an OBS object.
Submitting a Job
DLI provides an API for querying jobs. The example code is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | def run_sql(dli_client, db_name, queue_name):
# execute SQL
try:
sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name)
result_set = sql_job.get_result()
except DliException as e:
print(e)
return
if result_set.row_count == 0:
return
for row in result_set:
print(row)
# export the query result to obs
try:
status = sql_job.export_result('obs://bucket/obj',
queue_name=queue_name)
except DliException as e:
print(e)
return
print(status)
|
Canceling a Job
DLI provides an API for canceling jobs. You can use it to cancel a submitted job. A job that has been completed or failed cannot be canceled. The example code is as follows:
1 2 3 4 5 6 | def cancel_sql(dli_client, job_id):
try:
dli_client.cancel_sql(job_id)
except DliException as e:
print(e)
return
|
Querying All Jobs
DLI provides an API for querying all jobs. You can use the API to query information about all jobs in the current project and obtain the query result. The example code is as follows:
1 2 3 4 5 6 7 8 | def list_all_sql_jobs(dli_client):
try:
sql_jobs = dli_client.list_sql_jobs()
except DliException as e:
print(e)
return
for sql_job in sql_jobs:
print(sql_job)
|
Last Article: Table-Related SDKs
Next Article: SDKs Related to Spark Jobs
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.