使用SDK提交SQL作业
本节操作介绍使用DLI SDK V2提交SQL作业的操作方法。

DLI SDK V2当前处于公测阶段,如需使用请提交工单申请开通。
- 2024年5月起,首次使用DLI的用户可以直接使用DLI SDK V2,无需申请。
- 对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请开通试用权限。
前提条件
- 已参考Python开发环境配置配置Python SDK环境。
- 已参考初始化DLI客户端完成客户端DLIClient的初始化。
操作前准备
获取AK/SK,项目ID及对应的Region信息。
- 管理控制台。
- 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
- 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
- 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
- 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。
样例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
def main(): your_access_key = os.getenv("HUAWEICLOUD_SDK_AK") your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK") kwargs = { 'region': 'region_name', 'project_id': 'your_project_id', 'ak': your_access_key, 'sk': your_secret_key } from dli.dli_client import DliClient dli_client = DliClient(**kwargs) try: # 步骤一:创建数据库、表。 queue_name = 'your_sql_queue_name' prepare(dli_client, queue_name) # 步骤二:导入数据到表中。 # 整体实现过程/原理可分为以下3步: # 1. 用OBS的API把数据上传到 “obs_path_to_write_tmp_data”。可在OBS中配置生命周期策略,定期删除这些临时数据。 # 2. 向DLI提交执行Load Data语句,从而把OBS的数据导入到DLI。 # Load Data语法详见导入数据。 # 3. 循环检查作业运行状态,直到作业结束。 obs_path_to_write_tmp_data = f"obs://your_obs_bucket_name/your/path/{uuid.uuid4()}" load_data(dli_client, obs_path_to_write_tmp_data, queue_name) # 步骤三:提交SQL语句,执行查询并读取结果。 select_sql = "SELECT * FROM demo_db.demo_tbl" job_id = query_data(dli_client, select_sql, queue_name) # 步骤三': 如有需要,用户也可以通过作业ID来获取结果。 query_data_by_jobid(dli_client, job_id) # 分页查询所有作业,用户可以使用该接口查询当前工程下的所有SQL作业信息 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs, # 详见 ListSqlJobs list_sql_jobs(dli_client) # 其它场景: # 1. 如果用户想取消已经提交的SQL作业,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.cancel_sql_job # 详见 CancelSqlJob # 注:若作业已经执行结束或失败则无法取消。 # 2. 如果用户想对SQL语句做语法校验,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.check_sql # 详见 CheckSql # 注:本接口只能做语法校验,无法做语义校验。请使用Explain语句,提交到DLI执行,进行语义校验。 # 3. 如果用户想根据jobId获取某个已提交的SQL作业,并查看作业详情,可使用以下接口。 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_detail # 详见 ShowSqlJobDetail # 4. 获取作业执行进度信息,如果作业正在执行,可以获取到子作业的信息,如果作业刚开始或者已经结束,则无法获取到子作业信息 # 关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_progress # 详见 ShowSqlJobProgress except DliException as e: # 请根据业务实际情况处理异常信息,此处仅作样例。 logger.error(e) |
创建数据库和表
相关链接:
- 创建数据库语法参考
- 创建表语法参考
- 关键SDK: dli.dli_client.DliClient.execute_sql
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob。
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
def prepare(dli_client, queue_name): """ 创建数据库、表。 :param dli.dli_client.DliClient dli_client: DLI Client. :param str queue_name: Queue name for the SQL execute. :return: dli.job.SqlJob """ # 1. 创建数据库。 # “default”为内置数据库,不能创建名为“default”的数据库。 createDbSql = "CREATE DATABASE IF NOT EXISTS demo_db" dli_client.execute_sql(sql=createDbSql, queue_name=queue_name) # 提交SQL作业,并循环检查作业状态直到作业结束 # 2. 创建表。注:根据实际情况调整表结构和表数据目录,以及OBS存储路径!!! createTblSql = "CREATE TABLE IF NOT EXISTS `demo_tbl` (" \ " `bool_c` BOOLEAN," \ " `byte_c` TINYINT," \ " `short_c` SMALLINT," \ " `int_c` INT," \ " `long_c` BIGINT," \ " `float_c` FLOAT," \ " `double_c` DOUBLE," \ " `decimal_c` DECIMAL(10,2)," \ " `str_c` STRING," \ " `date_c` DATE," \ " `timestamp_c` TIMESTAMP," \ " `binary_c` BINARY," \ " `array_c` ARRAY<INT>," \ " `map_c` MAP<STRING, INT>," \ " `struct_c` STRUCT<`s_str_c`: STRING, `s_bool_c`: BOOLEAN>)" \ " LOCATION 'obs://demo_bucket/demo_db/demo_tbl'" \ " STORED as TEXTFILE" dli_client.execute_sql(sql=createTblSql, db_name='demo_db', queue_name=queue_name) |
导入数据
- 相关链接:
- 导入数据
- 原生数据类型
- 复杂数据类型
- 关键SDK: dli.dli_client.DliClient.execute_sql
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob。
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus
- 示例代码:
1 2 3 4 5 6 7 8 9 10
def load_data(dli_client, upload_data_path, queue_name): # 1. 写入数据到OBS临时目录。请根据业务实际情况做调整,此处仅作样例。 # 注:此步骤纯粹为直接调用OBS写数据相关API,与DLI完全解耦,本示例仅提供了一个写Json的实现,即文件在OBS上的保存格式为Json。 # 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。 write_tmp_data(get_schema(), upload_data_path, dli_client.dli_info, 1) # 2. 将第一步中写到OBS上的数据导入到DLI。 # 注:此处的data_type需要根据第一步中的文件类型来确定,本示例中是JSON。如用户使用其它格式,则需要修改成相应的 data_type loadSql = f"LOAD DATA INPATH '{upload_data_path}' INTO TABLE demo_db.demo_tbl OPTIONS(data_type 'json')" dli_client.execute_sql(sql=loadSql, queue_name=queue_name) # 提交SQL作业,并循环检查作业状态直到作业结束
查询作业结果
- 相关链接:
- 关键SDK:dli.dli_client.DliClient.execute_sql
- 关键SDK:dli.job.SqlJob.get_result. 需要开启结果写作业桶特性,否则会抛出异常。
可通过查询作业状态API响应体中的 result_path 来判断是否已开启作业结果写作业桶特性。
待作业运行结束后,如果result_path 以 obs:// 开头,则已开启作业结果写作业桶特性,否则未开启。
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.create_sql_job。详见 CreateSqlJob。
- 关键API: huaweicloudsdkdli.v1.dli_client.DliClient.show_sql_job_status。详见 ShowSqlJobStatus
1 2 3 4 5 6 7 8 9 10
def query_data(dli_client, select_sql, queue_name): """ :param dli.dli_client.DliClient dli_client: DLI Client. :param str select_sql: SQL statement :param str queue_name: Queue name for the SQL execute :return: str """ sql_job = dli_client.execute_sql(sql=select_sql, queue_name=queue_name) print_result(sql_job.get_result()) return sql_job.job_id
查询指定作业的结果
- 使用说明
- 示例代码
1 2 3 4 5 6 7 8 9 10
def query_data_by_jobid(dli_client, job_id): """ :param dli.dli_client.DliClient dli_client: DLI Client. :param str job_id: Query类型作业的job id :return: """ from dli.job import SqlJob sql_job = SqlJob(job_id=job_id, job_type="QUERY", client=dli_client) dli_client._cycle_check_sql_job(sql_job=sql_job) print_result(sql_job.get_result())
查询作业
- 使用说明
查询当前工程下的SQL作业信息。
如果作业较多,必须使用本示例中的分页查询的方式分批查询,否则只能返回第一页的内容,无法返回全部作业。
关键SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
def list_sql_jobs(client): """ 查询当前工程下的SQL作业信息。如果作业较多,必须使用本示例中的分页查询的方式分批查询,否则只能返回第一页的内容,无法返回全部作业。 关键SDK: huaweicloudsdkdli.v1.dli_client.DliClient.list_sql_jobs :param dli.dli_client.DliClient client: DLI Client. """ req = ListSqlJobsRequest() req.current_page = 1 # 默认值 1 req.page_size = 100 # 默认值 10 # 获取作业总数 job_count = client.inner_client.list_sql_jobs(req).job_count cur = 0 # 分页查询 while cur < job_count: list_sql_jobs_response = client.inner_client.list_sql_jobs(req) jobs = list_sql_jobs_response.jobs for job in jobs: # 在此处添加业务逻辑,对每个作业进行处理 print(job) cur += 1 if cur >= job_count: break req.current_page += 1
打印作业结果
- 使用说明
请根据业务实际情况处理相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9
def print_result(obs_reader): """ 请根据业务实际情况处理相应的每一行数据,此处仅作样例。 """ count = 0 for record in obs_reader: count += 1 print(record) logger.info("total records: %d", count)
写入数据到OBS writeTmpData
- 使用说明
此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。
用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
def write_tmp_data(schema, upload_data_path, dli_info, total_records): """ 此方法纯粹为直接调用OBS写数据相关API,与DLI API完全解耦,本示例提供了一个写Json的实现,即文件在OBS上的保存格式为Json。 用户可依据业务需求自定义实现,比如,用户可将文件保存为csv。 :param list schema: 数据的结构 schema :param str upload_data_path: 该OBS临时目录用于保存用户的业务数据 :param dli.dli_client.DliClient.dli_info dli_info: 初始化DliClient时传入的认证信息。 :param int total_records: 模拟数据行数。通过该参数配置插入的数据行数。此处仅做展示,用户可根据实际业务需求修改。 :return: """ obs_client = ObsClient(access_key_id=dli_info.ak, secret_access_key=dli_info.sk, is_secure=True, server=dli_info.obs) bucket_name = get_bucket_name(upload_data_path) object_prefix = get_object_prefix(upload_data_path) datas = "" try: for i in range(total_records): row = Row(schema=schema, columns=get_record()) datas += to_json_string(row, schema) object_key = object_prefix + "/tmp_data.json" obs_client.putObject(bucket_name, object_key, datas) logger.info("Uploaded data to OBS bucket '%s' with object key '%s'", bucket_name, object_key) finally: obs_client.close()
构建表的Schema
- 使用说明
请根据业务实际情况构造相应的Schema,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
def get_schema(): """ 请根据业务实际情况构造相应的Schema,此处仅作样例。 """ from dli.column import Column return [Column(name="bool_c", data_type="boolean", desc="boolean col"), Column(name="byte_c", data_type="tinyint", desc="tinyint col"), Column(name="short_c", data_type="smallint", desc="smallint col"), Column(name="int_c", data_type="int", desc="int col"), Column(name="long_c", data_type="bigint", desc="bigint col"), Column(name="float_c", data_type="float", desc="float col"), Column(name="double_c", data_type="double", desc="double col"), Column(name="decimal_c", data_type="decimal(10,2)", desc="decimal col"), Column(name="str_c", data_type="string", desc="string col"), Column(name="date_c", data_type="date", desc="date col"), Column(name="timestamp_c", data_type="timestamp", desc="timestamp col"), Column(name="binary_c", data_type="binary", desc="binary col"), Column(name="array_c", data_type="array<int>", desc="array col"), Column(name="map_c", data_type="map<string, int>", desc="map col"), Column(name="struct_c", data_type="struct<s_str_c:string,s_bool_c:boolean>", desc="struct col")]
按需生成测试数据 List<Object> genRecord
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
def get_record(): record = [ True, # boolean 1, # byte 123, # short 65535, # int 123456789012, # long 101.235, # float 256.012358, # double 33.05, # decimal "abc_123&", # string "2023-05-08", # date "1716345295000", # timestamp,毫秒 base64.b64encode("hello".encode('utf-8')), # binary [1, 2, 3], # array {"k": 123}, # map {"s_str_c": "Abc", "s_bool_c": True} # struct ] return record
to_json_string
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12
def to_json_string(row, schema): json_obj = {} for i, column in enumerate(schema): if column.is_partition_column: continue if column.type == 'binary': json_obj[column.name] = base64.b64encode(row.columns[i]).decode('utf-8') elif column.type.startswith('decimal'): json_obj[column.name] = float(row.columns[i]) else: json_obj[column.name] = row.columns[i] return json.dumps(json_obj) + "\n"
get_bucket_name
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7
def get_bucket_name(full_path): try: url = urlparse(full_path) return url.hostname except Exception as e: logger.error("Failed to get bucket name from full path", e) return None
get_object_prefix
- 使用说明
请根据业务实际情况构造相应的每一行数据,此处仅作样例。
- 示例代码
1 2 3 4 5 6 7
def get_object_prefix(full_path): try: url = urlparse(full_path) return url.path.lstrip('/') except Exception as e: logger.error("Failed to get object key from full path", e) return None