更新时间:2024-03-06 GMT+08:00

作业相关

完整样例代码和依赖包说明请参考:Python SDK概述

导入数据

DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表中。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def import_data(dli_client, db_name, tbl_name, queue_name):
    options = {
        "with_column_header": True,
        "delimiter": ",",
        "quote_char": "\"",
        "escape_char": "\\",
        "date_format": "yyyy/MM/dd",
        "timestamp_format": "yyyy/MM/dd hh:mm:ss"
        }

    try:
        job_id, status = \
            dli_client.import_table(tbl_name, db_name,
                                    'obs://bucket/obj/data.csv',
                                    'csv', 
                                    queue_name=queue_name,
                                    options=options)
    except DliException as e:
        print(e)
        return

    print(job_id)
    print(status)
  • 在提交导入作业前,可选择通过data_type参数设置导入数据的类型,例如将data_type设置为csv。csv数据的具体格式通可过options参数设置,例如:csv的分隔符,转义符等。
  • 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。

导出数据

DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def export_data(dli_client, db_name, tbl_name, queue_name):
    try:
        job_id, status = dli_client.export_table(tbl_name, db_name,
                                                 'obs://bucket/obj',
                                                 queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    print(job_id)
    print(status)
  • 在提交导出作业前,可选设置数据格式、压缩类型、导出模式等,导出格式只支持csv格式。
  • 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。

提交作业

DLI提供查询作业的接口。您可以使用该接口执行查询并获取查询结果。示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def run_sql(dli_client, db_name, queue_name):
    # execute SQL
    try:
        sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name, queue_name=queue_name)
        result_set = sql_job.get_result(queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    if result_set.row_count == 0:
        return

    for row in result_set:
        print(row)

    # export the query result to obs
    try:
        status = sql_job.export_result('obs://bucket/obj',
                                       queue_name=queue_name)
    except DliException as e:
        print(e)
        return

    print(status)

取消作业

DLI提供取消作业的接口。您可以使用该接口取消已经提交的作业,若作业已经执行结束或失败则无法取消。示例代码如下:

1
2
3
4
5
6
def cancel_sql(dli_client, job_id): 
    try: 
        dli_client.cancel_sql(job_id) 
    except DliException as e: 
        print(e) 
        return

查询所有作业

DLI提供查询所有作业的接口。您可以使用该接口执行查询当前工程下的所有作业的信息并获取查询结果。示例代码如下:

1
2
3
4
5
6
7
8
def list_all_sql_jobs(dli_client): 
    try: 
        sql_jobs = dli_client.list_sql_jobs() 
    except DliException as e: 
        print(e) 
        return 
    for sql_job in sql_jobs: 
        print(sql_job)

该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。

如果需要则可以通过查询所有作业API接口指定该参数进行查询。

查询SQL类型作业

您可以使用该接口查询当前工程下的所有SQL类型作业的信息并获取查询结果。示例代码如下:
def list_sql_jobs(dli_client):
    try:
        sql_jobs = dli_client.list_sql_jobs()
    except DliException as e:
        print(e)
        return