Runtime SDK
场景介绍
Runtime SDK是SDK的核心组成部分,提供Agent运行时所需的所有功能,包括请求处理、响应生成、上下文管理等。
原理优势
- 开箱即用,一键部署
- 零配置部署:一行命令即可将本地Agent部署到云端Runtime,无需手动构建镜像、配置网络。
- CLI工具完备:agentarts dev本地运行、agentarts launch一键部署、agentarts invoke本地运行。
- 模板丰富:内置多种Agent框架模板(Langchain/LangGraph、GoogleADK等),开箱即用。
- 框架无关,极致兼容
- 多框架支持:原生支持Langchain/LangGraph、GoogleADK等主流框架。
- 统一入口:无论使用何种框架,都可以通过统一的SDK接口进行管理和部署。
- 平滑迁移:现有Agent代码只需少量适配即可接入。
- 云端托管,Serverless体验
- 无需运维:云端自动托管运行环境,无需关心服务器、容器、网络配置。
- 弹性伸缩:根据流量自动扩缩容,从零到大规模并发自适应。
- 按量付费:仅在实际调用时计费,空闲时自动释放资源。
- 企业级安全隔离
- 会话隔离:每个用户/会话独立运行环境,防止数据串扰。
- 权限控制:与IAM深度集成,支持精细化的访问控制。
- 安全防护:基于云原生的安全机制,保障Agent和数据安全。
- 可观测性内置
- 实时监控:开箱即用的健康状态监控和指标展示。
- 日志追踪:自动收集和展示运行日志,快速定位问题。
- 调用洞察:完整的请求/响应日志和性能分析。
- 丰富的生态集成
- 多协议支持:原生支持HTTP、WebSocket、MCP等协议。
- 云服务集成:深度集成华为云SWR(镜像仓库)、IAM(身份认证)、APM(应用监控)等服务。
核心特性
- AgentArtsRuntimeApp类
- 初始化参数
app = AgentArtsRuntimeApp( debug=False, # 调试模式 lifespan=None, # 生命周期管理 middleware=None, # 中间件列表 protocol="http", # 协议类型: "http" 或 "https" ) - run() 方法
参数说明:
host: 绑定地址,默认在Docker环境中为 0.0.0.0,本地为 127.0.0.1
port: 绑定端口,默认 8080
**kwargs: 传递给uvicorn的其他参数
- 初始化参数
- 装饰器
- @app.entrypoint
注册Agent主入口函数,处理 /invocations端点的请求。
基本用法@app.entrypoint def handler(payload: dict) -> dict: """同步处理函数""" return {"result": payload["message"].upper()} @app.entrypoint async def async_handler(payload: dict) -> dict: """异步处理函数""" result = await some_async_operation(payload) return result使用请求上下文@app.entrypoint async def handler(payload: dict, context: RequestContext = None) -> dict: """带上下文的处理函数""" session_id = context.session_id if context else None request_id = context.request_id if context else None return { "response": "OK", "session_id": session_id, "request_id": request_id, }流式响应-返回生成器以实现流式输出(SSE格式)@app.entrypoint async def streaming_handler(payload: dict) -> AsyncGenerator: """流式响应处理函数""" message = payload.get("message", "") for char in message: await asyncio.sleep(0.1) yield {"chunk": char} @app.entrypoint def sync_streaming_handler(payload: dict) -> Generator: """同步流式响应""" for i in range(10): yield {"count": i} - @app.ping
注册健康检查处理函数,处理 /ping端点的请求。
from agentarts.sdk.runtime.model import PingStatus @app.ping def health_check() -> PingStatus: """自定义健康检查""" if is_healthy(): return PingStatus.HEALTHY else: return PingStatus.UNHEALTHY表1 PingStatus状态值 状态
说明
HEALTHY
服务健康,无正在执行的任务。
HEALTHY_BUSY
服务健康,有任务正在执行。
UNHEALTHY
服务不健康。
强制设置状态# 设置维护模式app.force_ping_status(PingStatus.UNHEALTHY) # 恢复正常app.force_ping_status(None)
- @app.websocket
注册WebSocket处理函数,处理 /ws端点的连接。
from starlette.websockets import WebSocket @app.websocket async def ws_handler(websocket: WebSocket, context: RequestContext = None): """WebSocket 处理函数""" await websocket.accept() try: while True: data = await websocket.receive_json() response = await process_message(data) await websocket.send_json(response) except WebSocketDisconnect: print("Client disconnected")WebSocket示例:聊天应用@app.websocket async def chat_handler(websocket: WebSocket, context: RequestContext = None): await websocket.accept() session_id = context.session_id if context else "default" try: while True: message = await websocket.receive_text() response = await agent.chat(session_id, message) await websocket.send_text(response) except WebSocketDisconnect: await agent.end_session(session_id) - @app.async_task
注册异步后台任务,任务执行状态会被自动追踪。
@app.async_task async def background_job(payload: dict): """后台异步任务""" await asyncio.sleep(10) result = await process_data(payload) return result @app.entrypoint async def handler(payload: dict, context: RequestContext = None): # 启动后台任务 asyncio.create_task(background_job(payload)) # 检查是否有运行中的任务 if app.has_running_tasks(): print("有后台任务正在执行") return {"status": "accepted"}
- @app.entrypoint
- RequestContext
- RequestContext是请求数据的上下文,包含请求的元信息。
表2 属性 属性
类型
说明
request_id
Optional[str]
请求唯一标识符
session_id
Optional[str]
会话标识符
request
Optional[Any]
原始请求对象
- 使用示例
@app.entrypoint async def handler(payload: dict, context: RequestContext = None): if context: print(f"Request ID: {context.request_id}") print(f"Session ID: {context.session_id}") # 访问原始请求对象 if context.request: headers = context.request.headers client_host = context.request.client.host return {"status": "ok"}
- RequestContext是请求数据的上下文,包含请求的元信息。
- AgentArtsRuntimeContext
- AgentArtsRuntimeContext是全局上下文管理器,基于Python的contextvars实现,支持异步安全的上下文访问。
- 核心特性
- 协程安全:每个异步任务都有独立的上下文视图。
- 全局访问:在调用栈的任何位置都可以访问上下文数据。
- 无需实例化:直接使用类方法访问。
- 可用方法
方法
说明
get_session_id()
获取会话ID
set_session_id(value)
设置会话ID
get_request_id()
获取请求ID
set_request_id(value)
设置请求ID
get_user_id()
获取用户ID
set_user_id(value)
设置用户ID
get_workload_access_token()
获取工作负载访问令牌
set_workload_access_token(value)
设置工作负载访问令牌
get_user_token()
获取用户令牌
set_user_token(value)
设置用户令牌
get_oauth2_callback_url()
获取OAuth2回调URL
set_oauth2_callback_url(value)
设置OAuth2回调URL
clear()
清除所有上下文变量
- 使用示例
from agentarts.sdk.runtime.context import AgentArtsRuntimeContext @app.entrypoint async def handler(payload: dict, context: RequestContext = None): # 方式一:通过 context 参数获取 session_id = context.session_id if context else None # 方式二:通过全局上下文获取 session_id = AgentArtsRuntimeContext.get_session_id() request_id = AgentArtsRuntimeContext.get_request_id() user_id = AgentArtsRuntimeContext.get_user_id() # 在深层调用中使用 result = await process_with_context() return {"session_id": session_id} async def process_with_context(): """在任意函数中访问上下文""" session_id = AgentArtsRuntimeContext.get_session_id() # 使用 session_id 进行业务处理 return {"processed": True, "session": session_id}