更新时间:2025-09-09 GMT+08:00
分享

使用SDK提交SQL作业

本节操作介绍使用DLI SDK V2提交SQL作业的操作方法。

DLI SDK V2当前处于公测阶段,如需使用请提交工单申请开通。

  • 2024年5月起,首次使用DLI的用户可以直接使用DLI SDK V2,无需申请。
  • 对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请开通试用权限。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def main():
     your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    kwargs = {
        'region': 'region_name', 
        'project_id': 'your_project_id',
        'ak': your_access_key,
        'sk': your_secret_key
    }
    from dli.dli_client import DliClient
    dli_client = DliClient(**kwargs)

    try:
        # 步骤一:创建数据库、表。
        queue_name = 'your_sql_queue_name'
        prepare(dli_client, queue_name)

        # 步骤二:导入数据到表中。
        # 整体实现过程/原理可分为以下3步:
        # 1. 用OBS的API把数据上传到 “obs_path_to_write_tmp_data”。可在OBS中配置生命周期策略,定期删除这些临时数据。
        # 2. 向DLI提交执行Load Data语句,从而把OBS的数据导入到DLI。
        #    Load Data语法详见导入数据。
        # 3. 循环检查作业运行状态,直到作业结束。
        obs_path_to_write_tmp_data = f"obs://your_obs_bucket_name/your/path/{uuid.uuid4()}"
        load_data(dli_client, obs_path_to_write_tmp_data, queue_name)

        # 步骤三:提交SQL语句,执行查询并读取结果。
        select_sql = "SELECT * FROM demo_db.demo_tbl"
        job_id = query_data(dli_client, select_sql, queue_name)

        # 步骤三': 如有需要,用户也可以通过作业ID来获取结果。
        query_data_by_jobid(dli_client, job_id)

        # 分页查询所有作业,用户可以使用该接口查询当前工程下的所有SQL作业信息
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs,
        # 详见 ListSqlJobs 
        list_sql_jobs(dli_client)

        # 其它场景:
        # 1. 如果用户想取消已经提交的SQL作业,可使用以下接口。
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.cancel_sql_job
        # 详见 CancelSqlJob
        # 注:若作业已经执行结束或失败则无法取消。

        # 2. 如果用户想对SQL语句做语法校验,可使用以下接口。
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.check_sql
        # 详见 CheckSql
        # 注:本接口只能做语法校验,无法做语义校验。请使用Explain语句,提交到DLI执行,进行语义校验。

        # 3. 如果用户想根据jobId获取某个已提交的SQL作业,并查看作业详情,可使用以下接口。
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_detail
        # 详见 ShowSqlJobDetail
        # 4. 获取作业执行进度信息,如果作业正在执行,可以获取到子作业的信息,如果作业刚开始或者已经结束,则无法获取到子作业信息
        # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_progress
        # 详见 ShowSqlJobProgress
    except DliException as e:
        # 请根据业务实际情况处理异常信息,此处仅作样例。
        logger.error(e)

创建数据库和表

相关链接:

示例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def prepare(dli_client, queue_name):
    """
    创建数据库、表。
    :param dli.dli_client.DliClient dli_client: DLI Client.
    :param str queue_name: Queue name for the SQL execute.
    :return: dli.job.SqlJob
    """
    # 1. 创建数据库。
    # “default”为内置数据库,不能创建名为“default”的数据库。
    createDbSql = "CREATE DATABASE IF NOT EXISTS demo_db"
    dli_client.execute_sql(sql=createDbSql, queue_name=queue_name)  # 提交SQL作业,并循环检查作业状态直到作业结束

    # 2. 创建表。注:根据实际情况调整表结构和表数据目录,以及OBS存储路径!!!
    createTblSql = "CREATE TABLE IF NOT EXISTS `demo_tbl` (" \
                   "  `bool_c` BOOLEAN," \
                   "  `byte_c` TINYINT," \
                   "  `short_c` SMALLINT," \
                   "  `int_c` INT," \
                   "  `long_c` BIGINT," \
                   "  `float_c` FLOAT," \
                   "  `double_c` DOUBLE," \
                   "  `decimal_c` DECIMAL(10,2)," \
                   "  `str_c` STRING," \
                   "  `date_c` DATE," \
                   "  `timestamp_c` TIMESTAMP," \
                   "  `binary_c` BINARY," \
                   "  `array_c` ARRAY<INT>," \
                   "  `map_c` MAP<STRING, INT>," \
                   "  `struct_c` STRUCT<`s_str_c`: STRING, `s_bool_c`: BOOLEAN>)" \
                   " LOCATION 'obs://demo_bucket/demo_db/demo_tbl'" \
                   " STORED as TEXTFILE"
    dli_client.execute_sql(sql=createTblSql, db_name='demo_db', queue_name=queue_name)

导入数据

  • 相关链接
  • 导入数据
  • 原生数据类型
  • 复杂数据类型
  • 关键SDK: dli.dli_client.DliClient.execute_sql
  • 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob
  • 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus
  • 示例代码:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def load_data(dli_client, upload_data_path, queue_name):
       # 1. 写入数据到OBS临时目录。请根据业务实际情况做调整,此处仅作样例。
        # 注:此步骤纯粹为直接调用OBS写数据相关API,与DLI完全解耦,本示例仅提供了一个写Json的实现,即文件在OBS上的保存格式为Json。
        # 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。
        write_tmp_data(get_schema(), upload_data_path, dli_client.dli_info, 1)
    
        # 2. 将第一步中写到OBS上的数据导入到DLI。
        # 注:此处的data_type需要根据第一步中的文件类型来确定,本示例中是JSON。如用户使用其它格式,则需要修改成相应的 data_type
        loadSql = f"LOAD DATA INPATH '{upload_data_path}' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')"
        dli_client.execute_sql(sql=loadSql, queue_name=queue_name)  # 提交SQL作业,并循环检查作业状态直到作业结束
    

查询作业结果

  • 相关链接

    SELECT查询语句

  • 关键SDK:dli.dli_client.DliClient.execute_sql
  • 关键SDK:dli.job.SqlJob.get_result. 需要开启结果写作业桶特性,否则会抛出异常。

    可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。

    待作业运行结束后,如果result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。

  • 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob
  • 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def query_data(dli_client, select_sql, queue_name):
        """
        :param dli.dli_client.DliClient dli_client: DLI Client.
        :param str select_sql: SQL statement
        :param str queue_name: Queue name for the SQL execute
        :return: str
        """
        sql_job = dli_client.execute_sql(sql=select_sql, queue_name=queue_name)
        print_result(sql_job.get_result())
        return sql_job.job_id
    

查询指定作业的结果

  • 使用说明
    • 关键API: 查询作业状态API
    • 关键SDK:dli.job.SqlJob.get_result.

      使用本方法的前提是需要开启结果写作业桶特性,否则作业运行时会抛出异常。

      可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。待作业运行结束后,如果 result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    def query_data_by_jobid(dli_client, job_id):
        """
        :param dli.dli_client.DliClient dli_client: DLI Client.
        :param str job_id: Query类型作业的job id
        :return:
        """
        from dli.job import SqlJob
        sql_job = SqlJob(job_id=job_id, job_type="QUERY", client=dli_client)
        dli_client._cycle_check_sql_job(sql_job=sql_job)
        print_result(sql_job.get_result())
    

查询作业

  • 使用说明

    查询当前工程下的SQL作业信息。

    如果作业较多,必须使用本示例中的分页查询的方式分批查询,否则只能返回第一页的内容,无法返回全部作业。

    关键SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    def list_sql_jobs(client):
        """
        查询当前工程下的SQL作业信息。如果作业较多,必须使用本示例中的分页查询的方式分批查询,否则只能返回第一页的内容,无法返回全部作业。
        关键SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs
    
        :param dli.dli_client.DliClient client: DLI Client.
        """
        req = ListSqlJobsRequest()
        req.current_page = 1  # 默认值 1
        req.page_size = 100  # 默认值 10
    
        # 获取作业总数
        job_count = client.inner_client.list_sql_jobs(req).job_count
        cur = 0
    
        # 分页查询
        while cur < job_count:
            list_sql_jobs_response = client.inner_client.list_sql_jobs(req)
            jobs = list_sql_jobs_response.jobs
            for job in jobs:
    
                # 在此处添加业务逻辑,对每个作业进行处理
                print(job)
    
                cur += 1
                if cur >= job_count:
                    break
            req.current_page += 1
    

打印作业结果

  • 使用说明

    请根据业务实际情况处理相应的每一行数据,此处仅作样例。

  • 示例代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    def print_result(obs_reader):
        """
        请根据业务实际情况处理相应的每一行数据,此处仅作样例。
        """
        count = 0
        for record in obs_reader:
            count += 1
            print(record)
        logger.info("total records: %d", count)
    

写入数据到OBS writeTmpData

  • 使用说明

    此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。

    用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    def write_tmp_data(schema, upload_data_path, dli_info, total_records):
        """
        此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。
        用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。
    
        :param list schema: 数据的结构 schema
        :param str upload_data_path: 该OBS临时目录用于保存用户的业务数据
        :param dli.dli_client.DliClient.dli_info dli_info: 初始化DliClient时传入的认证信息。
        :param int total_records: 模拟数据行数。通过该参数配置插入的数据行数。此处仅做展示,用户可根据实际业务需求修改。
        :return:
        """
        obs_client = ObsClient(access_key_id=dli_info.ak, secret_access_key=dli_info.sk, is_secure=True,
                               server=dli_info.obs)
        bucket_name = get_bucket_name(upload_data_path)
        object_prefix = get_object_prefix(upload_data_path)
    
        datas = ""
        try:
            for i in range(total_records):
                row = Row(schema=schema, columns=get_record())
                datas += to_json_string(row, schema)
            object_key = object_prefix + "/tmp_data.json"
            obs_client.putObject(bucket_name, object_key, datas)
            logger.info("Uploaded data to OBS bucket '%s' with object key '%s'", bucket_name, object_key)
        finally:
            obs_client.close()
    

构建表的Schema

  • 使用说明

    请根据业务实际情况构造相应的Schema,此处仅作样例。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    def get_schema():
        """
        请根据业务实际情况构造相应的Schema,此处仅作样例。
        """
        from dli.column import Column
        return [Column(name="bool_c", data_type="boolean", desc="boolean col"),
                Column(name="byte_c", data_type="tinyint", desc="tinyint col"),
                Column(name="short_c", data_type="smallint", desc="smallint col"),
                Column(name="int_c", data_type="int", desc="int col"),
                Column(name="long_c", data_type="bigint", desc="bigint col"),
                Column(name="float_c", data_type="float", desc="float col"),
                Column(name="double_c", data_type="double", desc="double col"),
                Column(name="decimal_c", data_type="decimal(10,2)", desc="decimal col"),
                Column(name="str_c", data_type="string", desc="string col"),
                Column(name="date_c", data_type="date", desc="date col"),
                Column(name="timestamp_c", data_type="timestamp", desc="timestamp col"),
                Column(name="binary_c", data_type="binary", desc="binary col"),
                Column(name="array_c", data_type="array<int>", desc="array col"),
                Column(name="map_c", data_type="map<string, int>", desc="map col"),
                Column(name="struct_c", data_type="struct<s_str_c:string,s_bool_c:boolean>", desc="struct col")]
    

按需生成测试数据 List<Object> genRecord

  • 使用说明

    请根据业务实际情况构造相应的每一行数据,此处仅作样例。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def get_record():
        record = [
            True,  # boolean
            1,  # byte
            123,  # short
            65535,  # int
            123456789012,  # long
            101.235,  # float
            256.012358,  # double
            33.05,  # decimal
            "abc_123&",  # string
            "2023-05-08",  # date
            "1716345295000",  # timestamp,毫秒
            base64.b64encode("hello".encode('utf-8')),  # binary
            [1, 2, 3],  # array
            {"k": 123},  # map
            {"s_str_c": "Abc", "s_bool_c": True}  # struct
        ]
        return record
    

to_json_string

  • 使用说明

    请根据业务实际情况构造相应的每一行数据,此处仅作样例。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def to_json_string(row, schema):
        json_obj = {}
        for i, column in enumerate(schema):
            if column.is_partition_column:
                continue
            if column.type == 'binary':
                json_obj[column.name] = base64.b64encode(row.columns[i]).decode('utf-8')
            elif column.type.startswith('decimal'):
                json_obj[column.name] = float(row.columns[i])
            else:
                json_obj[column.name] = row.columns[i]
        return json.dumps(json_obj) + "\n"
    

get_bucket_name

  • 使用说明

    请根据业务实际情况构造相应的每一行数据,此处仅作样例。

  • 示例代码
    1
    2
    3
    4
    5
    6
    7
    def get_bucket_name(full_path):
        try:
            url = urlparse(full_path)
            return url.hostname
        except Exception as e:
            logger.error("Failed to get bucket name from full path", e)
            return None
    

get_object_prefix

  • 使用说明

    请根据业务实际情况构造相应的每一行数据,此处仅作样例。

  • 示例代码
    1
    2
    3
    4
    5
    6
    7
    def get_object_prefix(full_path):
        try:
            url = urlparse(full_path)
            return url.path.lstrip('/')
        except Exception as e:
            logger.error("Failed to get object key from full path", e)
            return None
    

相关文档