更新时间:2026-01-06 GMT+08:00
分享

获取请求结果集

  • 功能介绍:用户可以使用查询语句的Statement结构体获取结果集。
  • 方法定义:getStatementResult(statement, page_num, from_obs, convert_type, workspaceID, timeout)
  • 参数说明:
表1 getStatementResult参数说明

参数名称

参数类型

是否必选

描述

statement

Statement

必选

该语句的Statement结构体。

page_num

int

可选

查询结果集的指定页数,当指定页数超过该查询结果集总页数时会返回空结果集,默认值为1。

from_obs

bool

可选

指定结果集是否通过OBS SDK获取,默认值为True。

convert_type

bool

可选

指定结果集是否转换为对应Python数据类型,默认值为False。

workspaceID

str

可选

当指定从FabricSQL EP获取结果集时,必须提供。

timeout

float

可选

请求超时时间。

  • 一个Result结构体代表该SQL语句查询结果集的一页。
  • 获取请求结果集调用的是OBS SDK,如果出现OBS SDK的报错信息请参考HTTP状态码(Python SDK)
  • 响应体说明:返回结果集响应体Result
表2 Result

参数名称

参数类型

描述

status

str

该条SQL的执行状态。

statement_id

str

语句ID。

num_rows

int

语句总行数。

row_count

int

当前页总行数。

page_count

int

总页数。

page_no

int

当前页数,从数字1开始。

err_code

int

错误码。

sql_state

int

sql状态码。

message

str

错误信息。

obs_result_format

ObsFileFormat

结果集文件在OBS存储的格式。

result_set

ResultSet

JSON格式结果集数据。

arrow_batch

RecordBatch

Arrow格式结果集数据。

表3 ResultSet

参数名称

参数类型

描述

columns

list[Column]

列头数据。

rows

list[list[str]]

列数据。

  • Result中result_set为JSON格式结果集,仅当OBS结果集文件为JSON时该字段被赋值,否则该字段为None。当OBS存储的结果集文件格式为Arrow时,arrow_batch被赋值为Arrow结果集数据,否则该字段为None。
  • 当OBS结果集格式为Arrow时,可以使用Result结构体的方法to_arrow返回该Arrow的RecordBatch,请使用pyarrow库对RecordBatch进行解析和处理。
  • 示例代码:
    import os
    from fabricsql.SQLClient import FabricSQLClient
    from fabricsql.SQLRequests import SessionRequest, QueryRequest
    # AK/SK属于敏感信息,推荐通过环境变量方式获取,避免直接明文编码在程序中
    endpoint = 'xxxxxxxxxxxxxx'
    accessKey = 'xxxxxxxxxxxxxx'
    secretKey = 'xxxxxxxxxxxxxxx'
    workspace_id = 'xxxxxxxxxxxxx'  
    client = FabricSQLClient(ep,ak ,sk)
    # 创建一个Session请求,填入必要信息
    s = SessionRequest()
    s.endpoint_id = endpoint_id
    s.instance_id = instance_id
    s.catalog = catalog
    #创建一个Session,获取session_id
    session = client.createSession(workspace_id, s)
    session_id = session.session_id
    # #创建一个SQL请求
    q = QueryRequest()
    q.statement = "show all"
    q.session_id = session_id
    #下发请求,返回一个请求响应
    query_resp = client.executeQuery(workspace_id, q)
    #根据该请求响应,轮询请求结果
    while True:
        statement_set = client.getSummary(wp, query_resp, get_schema=False)
        if statement_set.status == "SUCCESSFUL" or statement_set.status == "FAIL":
            break
        #等待2s后轮询
        time.sleep(2)
    #获取到请求结果后,从中遍历每个语句的结果集
    for statement in statement_set.statements:
        # 如果语句状态为PGRES_FATAL_ERROR说明查询失败
        if statement.status == 'PGRES_FATAL_ERROR':
            print(statement.message)
            continue
        page_count = statement.page_count
        if page_count == 0:
            continue
        for page_no in range(1, page_count + 1):
            result = client.getStatementResult(statement, page_no)
            # 打印该页的行数据,假设本次获取的JSON格式结果集
            row_data = result.result_set.rows
            print(row_data)
           

相关文档