获取请求结果集
- 功能介绍:用户可以使用查询语句的Statement结构体获取结果集。
- 方法定义:getStatementResult(statement, page_num, from_obs, convert_type, workspaceID, timeout)
- 参数说明:
|
参数名称 |
参数类型 |
是否必选 |
描述 |
|---|---|---|---|
|
statement |
Statement |
必选 |
该语句的Statement结构体。 |
|
page_num |
int |
可选 |
查询结果集的指定页数,当指定页数超过该查询结果集总页数时会返回空结果集,默认值为1。 |
|
from_obs |
bool |
可选 |
指定结果集是否通过OBS SDK获取,默认值为True。 |
|
convert_type |
bool |
可选 |
指定结果集是否转换为对应Python数据类型,默认值为False。 |
|
workspaceID |
str |
可选 |
当指定从FabricSQL EP获取结果集时,必须提供。 |
|
timeout |
float |
可选 |
请求超时时间。 |
- 响应体说明:返回结果集响应体Result
|
参数名称 |
参数类型 |
描述 |
|---|---|---|
|
status |
str |
该条SQL的执行状态。 |
|
statement_id |
str |
语句ID。 |
|
num_rows |
int |
语句总行数。 |
|
row_count |
int |
当前页总行数。 |
|
page_count |
int |
总页数。 |
|
page_no |
int |
当前页数,从数字1开始。 |
|
err_code |
int |
错误码。 |
|
sql_state |
int |
sql状态码。 |
|
message |
str |
错误信息。 |
|
obs_result_format |
ObsFileFormat |
结果集文件在OBS存储的格式。 |
|
result_set |
ResultSet |
JSON格式结果集数据。 |
|
arrow_batch |
RecordBatch |
Arrow格式结果集数据。 |
|
参数名称 |
参数类型 |
描述 |
|---|---|---|
|
columns |
list[Column] |
列头数据。 |
|
rows |
list[list[str]] |
列数据。 |
- Result中result_set为JSON格式结果集,仅当OBS结果集文件为JSON时该字段被赋值,否则该字段为None。当OBS存储的结果集文件格式为Arrow时,arrow_batch被赋值为Arrow结果集数据,否则该字段为None。
- 当OBS结果集格式为Arrow时,可以使用Result结构体的方法to_arrow返回该Arrow的RecordBatch,请使用pyarrow库对RecordBatch进行解析和处理。
- 示例代码:
import os from fabricsql.SQLClient import FabricSQLClient from fabricsql.SQLRequests import SessionRequest, QueryRequest # AK/SK属于敏感信息,推荐通过环境变量方式获取,避免直接明文编码在程序中 endpoint = 'xxxxxxxxxxxxxx' accessKey = 'xxxxxxxxxxxxxx' secretKey = 'xxxxxxxxxxxxxxx' workspace_id = 'xxxxxxxxxxxxx' client = FabricSQLClient(ep,ak ,sk) # 创建一个Session请求,填入必要信息 s = SessionRequest() s.endpoint_id = endpoint_id s.instance_id = instance_id s.catalog = catalog #创建一个Session,获取session_id session = client.createSession(workspace_id, s) session_id = session.session_id # #创建一个SQL请求 q = QueryRequest() q.statement = "show all" q.session_id = session_id #下发请求,返回一个请求响应 query_resp = client.executeQuery(workspace_id, q) #根据该请求响应,轮询请求结果 while True: statement_set = client.getSummary(wp, query_resp, get_schema=False) if statement_set.status == "SUCCESSFUL" or statement_set.status == "FAIL": break #等待2s后轮询 time.sleep(2) #获取到请求结果后,从中遍历每个语句的结果集 for statement in statement_set.statements: # 如果语句状态为PGRES_FATAL_ERROR说明查询失败 if statement.status == 'PGRES_FATAL_ERROR': print(statement.message) continue page_count = statement.page_count if page_count == 0: continue for page_no in range(1, page_count + 1): result = client.getStatementResult(statement, page_no) # 打印该页的行数据,假设本次获取的JSON格式结果集 row_data = result.result_set.rows print(row_data)
