获取请求结果集
- 功能介绍:用户可以使用查询语句的Statement结构体获取结果集。
- 方法定义:getStatementResult(statement, page_num, from_obs, convert_type, workspaceID, timeout)
- 参数说明:
参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
statement | Statement | 必选 | 该语句的Statement结构体。 |
page_num | int | 可选 | 查询结果集的指定页数,当指定页数超过该查询结果集总页数时会返回空结果集,默认值为1。 |
from_obs | bool | 可选 | 指定结果集是否通过OBS SDK获取,默认值为True。 |
convert_type | bool | 可选 | 指定结果集是否转换为对应Python数据类型,默认值为False。 |
workspaceID | str | 可选 | 当指定从FabricSQL EP获取结果集时,必须提供。 |
timeout | float | 可选 | 请求超时时间。 |

- 一个Result结构体代表该SQL语句查询结果集的一页。
- 获取请求结果集调用的是OBS SDK,如果出现OBS SDK的报错信息请参考HTTP状态码(Python SDK)。
- 响应体说明:返回结果集响应体Result
参数名称 | 参数类型 | 描述 |
|---|---|---|
status | str | 该条SQL的执行状态。 |
statement_id | str | 语句ID。 |
num_rows | int | 语句总行数。 |
row_count | int | 当前页总行数。 |
page_count | int | 总页数。 |
page_no | int | 当前页数,从数字1开始。 |
err_code | int | 错误码。 |
sql_state | int | sql状态码。 |
message | str | 错误信息。 |
obs_result_format | ObsFileFormat | 结果集文件在OBS存储的格式。 |
result_set | ResultSet | JSON格式结果集数据。 |
arrow_batch | RecordBatch | Arrow格式结果集数据。 |
参数名称 | 参数类型 | 描述 |
|---|---|---|
columns | list[Column] | 列头数据。 |
rows | list[list[str]] | 列数据。 |

- Result中result_set为JSON格式结果集,仅当OBS结果集文件为JSON时该字段被赋值,否则该字段为None。当OBS存储的结果集文件格式为Arrow时,arrow_batch被赋值为Arrow结果集数据,否则该字段为None。
- 当OBS结果集格式为Arrow时,可以使用Result结构体的方法to_arrow返回该Arrow的RecordBatch,请使用pyarrow库对RecordBatch进行解析和处理。
- 示例代码:
import os from fabricsql.SQLClient import FabricSQLClient from fabricsql.SQLRequests import SessionRequest, QueryRequest # AK/SK属于敏感信息,推荐通过环境变量方式获取,避免直接明文编码在程序中 endpoint = 'xxxxxxxxxxxxxx' accessKey = 'xxxxxxxxxxxxxx' secretKey = 'xxxxxxxxxxxxxxx' workspace_id = 'xxxxxxxxxxxxx' client = FabricSQLClient(ep,ak ,sk) # 创建一个Session请求,填入必要信息 s = SessionRequest() s.endpoint_id = endpoint_id s.instance_id = instance_id s.catalog = catalog #创建一个Session,获取session_id session = client.createSession(workspace_id, s) session_id = session.session_id # #创建一个SQL请求 q = QueryRequest() q.statement = "show all" q.session_id = session_id #下发请求,返回一个请求响应 query_resp = client.executeQuery(workspace_id, q) #根据该请求响应,轮询请求结果 while True: statement_set = client.getSummary(wp, query_resp, get_schema=False) if statement_set.status == "SUCCESSFUL" or statement_set.status == "FAIL": break #等待2s后轮询 time.sleep(2) #获取到请求结果后,从中遍历每个语句的结果集 for statement in statement_set.statements: # 如果语句状态为PGRES_FATAL_ERROR说明查询失败 if statement.status == 'PGRES_FATAL_ERROR': print(statement.message) continue page_count = statement.page_count if page_count == 0: continue for page_no in range(1, page_count + 1): result = client.getStatementResult(statement, page_no) # 打印该页的行数据,假设本次获取的JSON格式结果集 row_data = result.result_set.rows print(row_data)

