示例:构建你的首个高码智能体
本示例展示了一个基于AgentArts平台的完整Agent实现,充分利用平台提供的企业级能力来构建生产级别的AI Agent。
- 本地搭建智能体。
- 创建agent.py文件,编辑代码如下:
示例代码具备基础的模型访问能力、本地记忆以及简单文件处理工具。
import os import uuid import json import requests import urllib3 from typing import List, Optional, Dict, Any from pathlib import Path from dotenv import load_dotenv # 禁用 SSL 证书警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ================================================================================ # 基础依赖:LangGraph + LangChain # ================================================================================ # LangGraph: AI Agent 工作流编排框架 from langgraph.graph import StateGraph, END from langgraph.prebuilt import ToolNode # 注:InMemorySaver 作为内存型记忆 from langgraph.checkpoint.memory import InMemorySaver # ================================================================================ # 第一部分:配置模块 # ================================================================================ # 从 .env 文件加载环境变量 load_dotenv() class ModelConfig: """ 模型配置类 从环境变量读取模型配置: - MODEL_NAME: 模型名称(默认 gpt-4) - MODEL_URL: 模型API地址(默认 OpenAI 地址) - MODEL_API_KEY: 模型API密钥 - MODEL_TYPE: 模型类型(默认 openai) """ def __init__(self): self.name = os.getenv("MODEL_NAME", "gpt-4") self.url = os.getenv("MODEL_URL", "https://api.openai.com/v1") self.api_key = os.getenv("MODEL_API_KEY") self.model_type = os.getenv("MODEL_TYPE", "openai") class AgentConfig: """ Agent全局配置类 整合所有配置项: - model: 模型配置 - max_iterations: Agent最大迭代次数(防止无限循环) - memory_space: AgentArts 记忆空间ID """ def __init__(self): self.model = ModelConfig() self.max_iterations = int(os.getenv("MAX_ITERATIONS", "50")) self.memory_space = os.getenv("MEMORY_SPACE_ID", "default") self.region = os.getenv("HUAWEICLOUD_SDK_REGION", "cn-north-4") # 全局配置实例(单例模式) config = AgentConfig() _global_checkpointer: Optional[InMemorySaver] = None def get_checkpointer() -> InMemorySaver: global _global_checkpointer if _global_checkpointer is None: _global_checkpointer = InMemorySaver() return _global_checkpointer def get_thread_config(session_id: str) -> Dict[str, Any]: """ 获取 LangGraph 运行时配置 LangGraph 通过 config 中的 thread_id 来区分不同的会话 Args: session_id: 会话ID(对应 LangGraph 的 thread_id) Returns: LangGraph 运行时配置字典 """ return {"configurable": {"thread_id": session_id}} # ================================================================================ # 第三部分:LLM 客户端 # ================================================================================ class LLMClient: """ LLM 客户端封装类 封装 LangChain 的 ChatOpenAI,提供: - 模型调用能力 - 工具绑定能力(让模型能够调用工具) """ def __init__(self): self._client = self._create_client() def _create_client(self): """创建 OpenAI 兼容的 Chat 客户端""" model_cfg = config.model return ChatOpenAI( model=model_cfg.name, base_url=model_cfg.url, api_key=model_cfg.api_key, temperature=0, # 设为0以获得更确定性的输出 streaming=False, ) def invoke(self, messages: List[BaseMessage]) -> BaseMessage: """调用 LLM 生成回复""" return self._client.invoke(messages) def bind_tools(self, tools: List[Any]): """绑定工具到 LLM,使模型能够调用这些工具""" return self._client.bind_tools(tools) # 全局 LLM 客户端实例 llm_client = LLMClient() # ================================================================================ # 第四部分:工具定义 # ================================================================================ # 工具说明: # - 使用 @tool 装饰器将函数定义为 LangChain 工具 # - 工具函数必须有清晰的文档字符串,模型会根据描述决定是否调用 # - 工具返回值必须是字符串(会被返回给模型作为上下文) @tool def read_file(file_path: str) -> str: """ 读取文件内容 Args: file_path: 文件路径(绝对路径或相对路径) Returns: 文件内容(最大1MB) """ try: path = Path(file_path) if not path.exists(): return f"错误: 文件不存在 - {file_path}" if path.stat().st_size > 1024 * 1024: return f"错误: 文件过大 (最大支持1MB) - {file_path}" return path.read_text(encoding='utf-8') except Exception as e: return f"错误: 读取文件失败 - {str(e)}" @tool def write_file(file_path: str, content: str) -> str: """ 写入内容到文件(覆盖模式) Args: file_path: 文件路径 content: 要写入的内容 Returns: 操作结果 """ try: path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding='utf-8') return f"成功写入文件: {file_path}" except Exception as e: return f"错误: 写入文件失败 - {str(e)}" @tool def append_file(file_path: str, content: str) -> str: """ 追加内容到文件末尾 Args: file_path: 文件路径 content: 要追加的内容 Returns: 操作结果 """ try: path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'a', encoding='utf-8') as f: f.write(content) return f"成功追加内容到文件: {file_path}" except Exception as e: return f"错误: 追加文件失败 - {str(e)}" @tool def list_directory(dir_path: str = ".") -> str: """ 列出目录内容 Args: dir_path: 目录路径(默认当前目录) Returns: 目录内容列表 """ try: path = Path(dir_path) if not path.exists(): return f"错误: 目录不存在 - {dir_path}" if not path.is_dir(): return f"错误: 不是目录 - {dir_path}" items = [] for item in sorted(path.iterdir()): item_type = "DIR" if item.is_dir() else "FILE" size = item.stat().st_size if item.is_file() else 0 items.append(f"{item_type:6} | {size:>10} | {item.name}") return "类型 | 大小 | 名称\n" + "\n".join(items) except Exception as e: return f"错误: 列出目录失败 - {str(e)}" # ================================================================================ # 第六部分:Agent 主类 # ================================================================================ class AgentState(TypedDict): """Agent 状态类型定义""" messages: List iteration: int class LangGraphAgent: """ LangGraph Agent 主类 对外提供的核心类,封装了: - 工作流图的创建和执行 - 会话状态管理 - 对话历史的存取 使用示例: agent = LangGraphAgent() response = agent.run("你好", session_id="user-001") history = agent.get_history(session_id="user-001") """ def __init__(self, system_prompt: Optional[str] = None, checkpointer=None): """ 初始化 Agent Args: system_prompt: 系统提示词(可选,默认值见 _default_system_prompt) checkpointer: 自定义 Checkpointer(可选,默认使用全局单例) """ self._checkpointer = checkpointer or get_checkpointer() self.graph = create_agent_graph(checkpointer=self._checkpointer) self.system_prompt = system_prompt or self._default_system_prompt() self._last_session_id: Optional[str] = None def _default_system_prompt(self) -> str: """默认系统提示词""" return """你是一个智能助手,可以通过调用工具来帮助用户完成任务。 可用的工具: - read_file: 读取本地文件内容 - write_file: 写入本地文件 - append_file: 追加内容到本地文件 - list_directory: 列出本地目录内容 """ def _get_config(self, session_id: str) -> Dict[str, Any]: """获取运行时配置""" return get_thread_config(session_id) def run(self, user_input: str, session_id: Optional[str] = None) -> str: """ 运行 Agent 处理用户输入 这是主要入口方法,每次调用都会: 1. 从 Checkpoint 恢复会话状态(如有) 2. 添加用户消息 3. 执行工作流 4. 自动保存状态到 Checkpoint Args: user_input: 用户输入的文本 session_id: 会话ID(可选) - 不传:使用上次会话ID或自动生成新ID - 传值:使用指定会话ID,实现多会话隔离 Returns: Agent 的回复文本 """ # 确定会话ID if session_id is None: session_id = self._last_session_id or str(uuid.uuid4()) # 创建会话(如果已存在则忽略,继续执行) memory_client = MemoryClient() try: memory_client.create_memory_session(space_id=config.memory_space, id=session_id) except Exception as e: # Session already exists, continue with existing session if "Session id already exists" in str(e): pass else: raise self._last_session_id = session_id thread_config = self._get_config(session_id) # 从 checkpointer 获取历史消息 checkpoint = self._checkpointer.get(thread_config) if checkpoint and "channel_values" in checkpoint: history_messages = list(checkpoint["channel_values"].get("messages", [])) else: history_messages = [] # 如果有历史消息,添加到现有消息后面(避免重复添加用户消息) if history_messages: # 检查最后一条是否是相同的用户消息 if not (history_messages and isinstance(history_messages[-1], HumanMessage) and history_messages[-1].content == user_input): messages = history_messages + [HumanMessage(content=user_input)] else: messages = history_messages else: # 首次对话,添加系统消息 messages = [ SystemMessage(content=self.system_prompt), HumanMessage(content=user_input) ] # 执行工作流 result = self.graph.invoke({"messages": messages}, config=thread_config) # 返回最后一条消息的内容 last_message = result["messages"][-1] return last_message.content if hasattr(last_message, "content") else str(last_message) @property def session_id(self) -> Optional[str]: """获取当前会话ID""" return self._last_session_id - 创建main.py文件简单测试agent响应。
""工具使用示例 - 展示如何让Agent使用工具""" from agent import LangGraphAgent def main(): # 创建Agent agent = LangGraphAgent() print("=" * 50) print("示例1: 读取文件") print("=" * 50) # 让Agent读取当前目录 response = agent.run("请列出当前目录的文件名,有哪几个文件") print(f"Agent: {response}") print("=" * 50) print("示例2: 查看历史记录") print("=" * 50) # 让Agent读取当前目录 response = agent.run("请列上一个问题") print(f"Agent: {response}") if __name__ == "__main__": main() - 创建.env文件配置如下模型相关环境变量
# 模型配置 MODEL_NAME=your_model_name MODEL_URL=your_model_url MODEL_API_KEY=your_model_api_key # Agent配置 MAX_ITERATIONS=50 SESSION_TTL=3600
- 运行命令python -m main.py执行。
- 创建agent.py文件,编辑代码如下:
- 集成组件库增强能力。
- 对接AgentArts runtime,使用SDK封装成http server。创建app.py文件,代码参考如下:
""" ================================================================================ AgentArts 平台部署入口 - 快速将 Agent 部署为 HTTP 服务 ================================================================================ 本文件展示了如何使用 AgentArts 平台的 @entrypoint 注解, 只需编写业务逻辑,即可快速将 Agent 部署为可调用的 HTTP 服务。 平台部署能力: 1. 零配置部署 - 只需编写业务逻辑,自动生成 HTTP 接口 2. 自动请求解析 - 平台自动将 JSON 请求转换为 payload 3. 内置会话管理 - 自动处理 session_id,支持多用户隔离 4. 全平台能力集成 - 自动继承记忆、身份认证、工具等平台能力 5. 水平扩展 - 支持多副本部署,自动负载均衡 6. 多协议支持 - HTTP REST + WebSocket 双通道 ================================================================================ """ from agentarts.sdk import AgentArtsRuntimeApp, RequestContext # 导入我们编写的 LangGraph Agent from agent import LangGraphAgent # ================================================================================ # 第一步:创建平台应用实例 # ================================================================================ # AgentArtsRuntimeApp: AgentArts 平台核心应用类 # - 自动启动 HTTP 服务 # - 自动处理请求路由 # - 自动集成平台能力(记忆、认证、工具等) app = AgentArtsRuntimeApp() # 创建 Agent 实例(所有请求共享此实例) # 平台会自动处理并发和会话隔离 myagent = LangGraphAgent() # ================================================================================ # 第二步:定义入口函数(核心业务逻辑) # ================================================================================ # @app.entrypoint: 声明式入口点装饰器 # # 工作原理: # 1. 平台接收 HTTP 请求 # 2. 自动解析请求体为 payload 字典 # 3. 自动提取 session_id 用于会话隔离 # 4. 调用被装饰的函数,传入 payload 和 context # 5. 函数的返回值自动序列化为 JSON 响应 # # 参数说明: # - payload: 请求体解析后的字典,包含用户传入的参数 # - context: 请求上下文,包含 session_id、用户信息等 # # 平台自动处理: # HTTP 请求解析 # JSON 序列化/反序列化 # 会话 ID 提取与管理 # 异常捕获与错误返回 # 请求日志与监控 @app.entrypoint def my_agent(payload, context: RequestContext): """ Agent 入口处理函数 只需编写业务逻辑,平台负责其余一切: - 请求解析 - 会话管理 - 响应封装 - 错误处理 - 监控告警 Args: payload: 请求参数字典 - prompt: 用户输入(支持多种参数名) - message: 用户输入(备选参数名) - session_id: 会话ID(可选,平台自动管理) - 其他自定义参数... context: 请求上下文(平台注入) - context.session_id: 当前会话ID - context.request_id: 请求追踪ID - ... Returns: dict: 响应内容(自动序列化为 JSON) - response: Agent 回复内容 - status: 执行状态 (success/error) - 其他自定义字段... """ # ================================================================================ # 第三步:编写业务逻辑 # ================================================================================ # 参数获取(平台已自动解析) prompt = payload.get("prompt", "") message = payload.get("message", "") # session_id 由平台自动管理,无需手动处理 session_id = context.session_id # 统一用户输入(支持多种参数名) user_input = prompt or message # 调用 Agent 处理(平台能力自动生效) # - 会话记忆:自动恢复历史上下文 # - 工具调用:自动注入访问令牌 # - 代码执行:自动使用安全沙箱 result = myagent.run(user_input, session_id=session_id) # ================================================================================ # 第四步:返回响应(平台自动封装) # ================================================================================ # 只需返回字典,平台自动处理: # - JSON 序列化 # - HTTP 响应头 # - 跨域处理 # - 错误码映射 return { "response": result, "status": "success" } # ================================================================================ # 第五步:启动服务 # ================================================================================ # 本地开发模式 # 运行: python app.py # 服务启动后,访问 http://localhost:8080 查看 API 文档 if __name__ == "__main__": # 平台自动启动 HTTP 服务器 # 默认端口: 8080 # 可通过环境变量配置: AGENT_RUN_PORT app.run(port=8080)执行python app.py启动http serve,执行以下命令调用:
curl --location --request POST 'http://localhost:8080/invocations' \ --header 'Content-Type: application/json'\ --data-raw '{"message": "请列出当前目录的文件名,有哪几个文件"}'
- 对接AgentArts memory组件,参考代码如下:
# - AgentArtsMemorySessionSaver: 企业级会话状态持久化 # - 替代标准 InMemorySaver,提供生产级能力 from agentarts.sdk.integration.langgraph import AgentArtsMemorySessionSaver # ================================================================================ # 第二部分:平台级记忆系统 (AgentArts Checkpoint) # ================================================================================ # AgentArts 平台提供的企业级会话记忆能力 # # 与标准 LangGraph InMemorySaver 的区别: # ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐ # │ 特性 │ InMemorySaver (标准) │ AgentArtsMemorySessionSaver │ # ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤ # │ 存储位置 │ 进程内存 │ 平台持久化存储 │ # │ 多实例共享 │ 不支持 │ 支持(跨实例共享) │ # │ 会话隔离 │ 基础隔离 │ 企业级租户隔离 │ # │ 状态恢复 │ 仅当前进程 │ 跨应用、跨会话恢复 │ # │ 数据安全 │ 重启丢失 │ 企业级数据保护 │ # │ 扩展性 │ 单机 │ 分布式集群支持 │ # └─────────────────────┴──────────────────────────────┴──────────────────────────────┘ # # 核心能力: # - space_id: 记忆空间标识,支持多租户隔离 # - region: 区域配置,支持跨区域部署 # - 自动状态持久化:每次 Agent 执行后自动保存状态 # - 快速状态恢复:从持久化存储中恢复会话上下文 _global_checkpointer: Optional[AgentArtsMemorySessionSaver] = None def get_checkpointer() -> AgentArtsMemorySessionSaver: """ 获取平台级 Checkpointer 实例(单例模式) 使用 AgentArtsMemorySessionSaver 替代标准的 InMemorySaver, 实现企业级的会话状态持久化能力。 平台优势: - 多实例共享:支持 Agent 部署多副本时共享会话状态 - 租户隔离:通过 space_id 实现企业级数据隔离 - 状态恢复:服务重启后自动恢复会话上下文 - 零运维:无需自行搭建 Redis 等存储服务 Returns: AgentArtsMemorySessionSaver: AgentArts 平台级记忆存储 """ global _global_checkpointer if _global_checkpointer is None: _global_checkpointer = AgentArtsMemorySessionSaver(space_id="your_space_id", region="your_space_region") return _global_checkpointer - 使用云上内置代码解释器工具,参考代码如下:
@tool def execute_python(code: str, description: str = "") -> str | None: """ 在平台沙箱环境中安全执行 Python 代码 本工具使用 AgentArts 平台提供的 code_session 企业级代码执行能力, 实现了完全隔离的安全沙箱环境,可以安全地执行 Agent 生成的代码。 平台沙箱能力: ┌─────────────────────────────────────┐ │ 安全隔离 │ ├──────────────────────────────────────┤ │ 完全隔离的执行环境,代码无法访问宿主机器资源 │ │ 网络隔离:仅允许特定域名访问(可选配置) │ │ 文件系统隔离:仅能访问临时工作目录 │ │ 禁止危险操作:禁止 subprocess/threading/文件直接访问等 │ ├──────────────────────────────────────┤ │ 资源控制 │ ├──────────────────────────────────────┤ │ CPU 限制:防止无限循环占用资源 │ │ 内存限制:防止内存泄漏导致系统崩溃 │ │ 执行超时:自动终止长时间运行的代码 │ │ 磁盘配额:防止恶意写入大量数据 │ ├──────────────────────────────────────┤ │ 企业级特性 │ ├──────────────────────────────────────┤ │ 执行日志:完整记录代码执行过程 │ │ 审计追溯:记录谁在什么时候执行了什么代码 │ │ 异常捕获:自动捕获并安全处理执行中的异常 │ │ 多区域支持:可选择不同区域的执行环境 │ └──────────────────────────────────────┘ 对比自建沙箱: 自建方案 vs AgentArts 平台方案 ┌─────────────────────────────────┬─────┐ │ 需要自行处理: │ 平台全托管: │ │ Docker 容器管理 │ 开箱即用的容器化执行环境 │ │ 资源限制配置 │ 自动 CPU/内存/超时控制 │ │ 安全漏洞修补 │ 平台持续安全更新 │ │ 执行监控告警 │ 统一监控与告警 │ │ 日志收集分析 │ 结构化日志与审计 │ └─────────────────────────────────┴─────┘ 使用示例: # Agent 可以直接生成并执行代码 result = execute_python.invoke("print([x**2 for x in range(10)])") # 返回: "[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]" Args: code: 要执行的 Python 代码 description: 代码的简要描述(可选,用于日志和调试) Returns: 执行结果的 JSON 字符串 """ from agentarts.sdk.tools import code_session if description: code = f"# {description}\n{code}" print(f"\n Generated Code: {code}") # 使用平台提供的 code_session 沙箱执行代码 with code_session("your_interpreter_region", "your_interpre_name") as code_client: response = code_client.invoke( operate_type="execute_code", arguments={ "code": code, "language": "python", "clearContext": False # 保持执行上下文,允许跨调用共享变量 } ) print(response) return json.dumps(response["result"]) - 使用云上identity获取出站认证凭据,参考如下代码示例:
@tool @require_access_token( provider_name="your_provider_name", # Agent Identity 平台注册的 GitHub 身份提供商 scopes=["user:email"], # 请求的 OAuth2 权限范围 auth_flow="USER_FEDERATION", # 用户联邦认证流程 ignore_ssl_verification=True, ) def github_get(url: str, headers: Optional[Dict] = None, access_token: Optional[str] = None) -> str: """ 访问 GitHub API(平台级身份认证) 本函数使用 AgentArts 平台的 require_access_token 装饰器实现自动身份认证, 无需手动管理 GitHub 访问令牌。 平台能力 vs 传统方式对比: ┌─────────────────────┬────────────────────┐ │ 特性 │ 传统方式 │ AgentArts + require_access_ │ │ │ │ token │ ├─────────────────────┼────────────────────┤ │ 令牌管理 │ 手动创建、存储、轮换 │ 平台自动管理 │ │ 令牌获取 │ 用户自行配置 env 或配置 │ 装饰器自动注入 │ │ 令牌刷新 │ 需要编写刷新逻辑 │ 平台自动处理 │ │ 安全性 │ 令牌暴露在代码或环境变量 │ 令牌存储在平台安全存储 │ │ 用户体验 │ 配置复杂,容易出错 │ 声明式配置,开箱即用 │ └─────────────────────┴──────────────────────┘ 认证流程说明: 1. 用户首次调用时,平台自动发起 OAuth2 授权流程 2. 用户在 GitHub 授权页面完成授权 3. 平台获取并安全存储访问令牌 4. 后续调用时,access_token 自动注入到函数参数 5. 令牌过期前,平台自动刷新 Args: url: GitHub API 路径(如 /repos/owner/repo/issues) headers: 自定义请求头 access_token: [平台自动注入] OAuth2 访问令牌 Returns: API 响应内容 """ # 记录 token 状态 has_token = bool(access_token) print(f"[GITHUB] access_token: {'已注入' if has_token else '未注入'}") base_url = "https://api.github.com" default_headers = { "User-Agent": "LangGraph-Agent", "Accept": "application/vnd.github.v3+json", } if access_token: default_headers["Authorization"] = f"token {access_token}" if headers: default_headers.update(headers) # 获取代理配置 proxies = {} http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") if http_proxy: proxies["http"] = http_proxy if https_proxy: proxies["https"] = https_proxy try: full_url = base_url + url if url.startswith("/") else url # 禁用 SSL 证书验证(仅用于测试) response = requests.get(full_url, headers=default_headers, timeout=30, proxies=proxies if proxies else None, verify=False) # 记录响应状态 print(f"[GITHUB] 响应状态: {response.status_code}") if len(response.content) > 1024 * 1024: return f"响应内容过大,已截断\n\n{response.text[:10000]}" return f"状态码: {response.status_code}\n\n{response.text}" except Exception as e: print(f"[GITHUB] 请求失败: {str(e)}") return f"错误: GitHub API请求失败 - {str(e)}"
- 对接AgentArts runtime,使用SDK封装成http server。创建app.py文件,代码参考如下:
- 部署智能体运行时。
按上述步骤完成后,基本代码开发已经完成,接下来准备部署到平台。
首先准备依赖文件requirements.txt内容可参考如下:
# ================================================================================ # AgentArts LangGraph Agent Demo - 依赖清单 # ================================================================================ # # 本项目基于 AgentArts 平台,使用 LangGraph 框架构建 AI Agent # 依赖分为两部分:基础框架依赖 + 平台 SDK 依赖 # # 安装方式: # pip install -r requirements.txt # # ================================================================================ # ================================================================================ # 第一部分:LangGraph & LangChain 核心框架 # ================================================================================ # LangGraph: AI Agent 工作流编排框架 # - 状态图定义与执行 # - Checkpoint 持久化机制 # - 条件边与节点路由 langgraph>=0.2.0 # LangChain: LLM 应用开发工具链 # - 消息类型定义 (HumanMessage, AIMessage, SystemMessage) # - 工具系统 (@tool 装饰器) # - LLM 客户端封装 langchain>=0.3.0 langchain-core>=0.3.0 langchain-community>=0.3.0 # ================================================================================ # 第二部分:LLM Provider 支持 # ================================================================================ # OpenAI 兼容接口 openai>=1.0.0 langchain-openai>=0.1.0 # Anthropic (Claude) 支持 anthropic>=0.18.0 langchain-anthropic>=0.1.0 # ================================================================================ # 第三部分:HTTP & 网络 # ================================================================================ requests>=2.31.0 httpx>=0.27.0 # ================================================================================ # 第四部分:工具与配置 # ================================================================================ # 环境变量管理 python-dotenv>=1.0.0 # 异步支持 aiofiles>=23.0.0 # JSON/YAML 支持 pyyaml>=6.0 # ================================================================================ # 第五部分:AgentArts 平台 SDK(核心依赖) todo 待定 # ================================================================================ # # # ================================================================================
执行命令配置智能体
agentarts configure --entrypoint app:app
执行命令部署智能体
agentarts launch
完整agent.py代码样例可参考如下:
核心平台能力:
1. 记忆系统 (AgentArtsMemorySessionSaver)
- 基于 LangGraph Checkpoint 的会话状态持久化
- 支持跨会话、跨应用的状态恢复
- 企业级数据安全与隔离
2. 身份认证 (AgentArtsIdentity + require_access_token)
- 自动 OAuth2 凭据获取与管理
- 支持多种第三方平台集成(GitHub/GitLab等)
- 无需手动管理令牌,平台自动处理刷新
3. 安全代码执行 (AgentArts Code Sandbox)
- 企业级沙箱环境,完全隔离
- 自动资源控制(CPU/内存/超时)
- 危险操作自动拦截
- 完整执行日志与审计追溯
4. 工具生态
- 文件操作、URL访问
- GitHub API 深度集成(自动获取访问令牌)
- 平台级代码执行沙箱
- 可扩展的工具注册机制
平台优势:
- 开箱即用的企业级记忆能力
- 简化的第三方服务认证流程
- 安全的代码执行沙箱
- 完善的会话隔离与安全机制
================================================================================
"""
import os
import uuid
import json
import requests
import urllib3
from typing import List, Optional, Dict, Any
from pathlib import Path
from dotenv import load_dotenv
from langchain.agents import AgentState
# 禁用 SSL 证书警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ================================================================================
# 基础依赖:LangGraph + LangChain
# ================================================================================
# LangGraph: AI Agent 工作流编排框架
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
# 注:InMemorySaver 仅保留作为参考,生产环境使用 AgentArtsMemorySessionSaver
# from langgraph.checkpoint.memory import InMemorySaver
# LangChain: LLM 应用开发工具链
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
# ================================================================================
# AgentArts 平台能力依赖
# ================================================================================
# AgentArtsIdentity: 平台级身份认证服务
# - require_access_token: 声明式 OAuth2 凭据注入装饰器
# - 自动处理令牌获取、刷新、存储
from agentarts.sdk.identity.auth import require_access_token
# AgentArtsMemory:
# - AgentArtsMemorySessionSaver: 企业级会话状态持久化,集成 LangGraph 框架
# - 替代标准 InMemorySaver,提供生产级能力
from agentarts.sdk.integration.langgraph import AgentArtsMemorySessionSaver
from agentarts.sdk.memory import MemoryClient
# AgentArts Runtiem: 核心启动类
from agentarts.sdk import AgentArtsRuntimeApp, RequestContext
# ================================================================================
# 第一部分:配置模块
# ================================================================================
# 从 .env 文件加载环境变量
load_dotenv()
class ModelConfig:
"""
模型配置类
从环境变量读取模型配置:
- MODEL_NAME: 模型名称(默认 gpt-4)
- MODEL_URL: 模型API地址(默认 OpenAI 地址)
- MODEL_API_KEY: 模型API密钥
- MODEL_TYPE: 模型类型(默认 openai)
"""
def __init__(self):
self.name = os.getenv("MODEL_NAME", "gpt-4")
self.url = os.getenv("MODEL_URL", "https://api.openai.com/v1")
self.api_key = os.getenv("MODEL_API_KEY")
self.model_type = os.getenv("MODEL_TYPE", "openai")
class AgentConfig:
"""
Agent全局配置类
整合所有配置项:
- model: 模型配置
- max_iterations: Agent最大迭代次数(防止无限循环)
- memory_space: AgentArts 记忆空间ID
"""
def __init__(self):
self.model = ModelConfig()
self.max_iterations = int(os.getenv("MAX_ITERATIONS", "50"))
self.memory_space = os.getenv("MEMORY_SPACE_ID", "default")
self.region = os.getenv("HUAWEICLOUD_SDK_REGION", "cn-north-4")
# 全局配置实例(单例模式)
config = AgentConfig()
# ================================================================================
# 第二部分:平台级记忆系统 (AgentArts Checkpoint)
# ================================================================================
# AgentArts 平台提供的企业级会话记忆能力
#
# 与标准 LangGraph InMemorySaver 的区别:
# ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
# │ 特性 │ InMemorySaver (标准) │ AgentArtsMemorySessionSaver │
# ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
# │ 存储位置 │ 进程内存 │ 平台持久化存储 │
# │ 多实例共享 │ 不支持 │ 支持(跨实例共享) │
# │ 会话隔离 │ 基础隔离 │ 企业级租户隔离 │
# │ 状态恢复 │ 仅当前进程 │ 跨应用、跨会话恢复 │
# │ 数据安全 │ 重启丢失 │ 企业级数据保护 │
# │ 扩展性 │ 单机 │ 分布式集群支持 │
# └─────────────────────┴──────────────────────────────┴──────────────────────────────┘
#
# 核心能力:
# - space_id: 记忆空间标识,支持多租户隔离
# - region: 区域配置,支持跨区域部署
# - 自动状态持久化:每次 Agent 执行后自动保存状态
# - 快速状态恢复:从持久化存储中恢复会话上下文
_global_checkpointer: Optional[AgentArtsMemorySessionSaver] = None
def get_checkpointer() -> AgentArtsMemorySessionSaver:
"""
获取平台级 Checkpointer 实例(单例模式)
使用 AgentArtsMemorySessionSaver 替代标准的 InMemorySaver,
实现企业级的会话状态持久化能力。
平台优势:
- 多实例共享:支持 Agent 部署多副本时共享会话状态
- 租户隔离:通过 space_id 实现企业级数据隔离
- 状态恢复:服务重启后自动恢复会话上下文
- 零运维:无需自行搭建 Redis 等存储服务
Returns:
AgentArtsMemorySessionSaver: AgentArts 平台级记忆存储
"""
global _global_checkpointer
if _global_checkpointer is None:
_global_checkpointer = AgentArtsMemorySessionSaver(space_id=config.memory_space, region=config.region)
return _global_checkpointer
def get_thread_config(session_id: str) -> Dict[str, Any]:
"""
获取 LangGraph 运行时配置
LangGraph 通过 config 中的 thread_id 来区分不同的会话
Args:
session_id: 会话ID(对应 LangGraph 的 thread_id)
Returns:
LangGraph 运行时配置字典
"""
return {"configurable": {"thread_id": session_id}}
# ================================================================================
# 第三部分:LLM 客户端
# ================================================================================
class LLMClient:
"""
LLM 客户端封装类
封装 LangChain 的 ChatOpenAI,提供:
- 模型调用能力
- 工具绑定能力(让模型能够调用工具)
"""
def __init__(self):
self._client = self._create_client()
def _create_client(self):
"""创建 OpenAI 兼容的 Chat 客户端"""
model_cfg = config.model
return ChatOpenAI(
model=model_cfg.name,
base_url=model_cfg.url,
api_key=model_cfg.api_key,
temperature=0, # 设为0以获得更确定性的输出
streaming=False,
)
def invoke(self, messages: List[BaseMessage]) -> BaseMessage:
"""调用 LLM 生成回复"""
return self._client.invoke(messages)
def bind_tools(self, tools: List[Any]):
"""绑定工具到 LLM,使模型能够调用这些工具"""
return self._client.bind_tools(tools)
# 全局 LLM 客户端实例
llm_client = LLMClient()
# ================================================================================
# 第四部分:工具定义
# ================================================================================
# 工具说明:
# - 使用 @tool 装饰器将函数定义为 LangChain 工具
# - 工具函数必须有清晰的文档字符串,模型会根据描述决定是否调用
# - 工具返回值必须是字符串(会被返回给模型作为上下文)
#
# 平台工具 vs 自建工具:
# ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
# │ 特性 │ 自建工具 │ AgentArts 平台工具 │
# ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
# │ 代码执行 │ 自行搭建沙箱 │ 企业级安全沙箱 │
# │ 安全性 │ 需要自行处理恶意代码 │ 自动隔离危险操作 │
# │ 资源限制 │ 自行实现 │ 自动 CPU/内存/超时控制 │
# │ 状态管理 │ 自行处理会话状态 │ 跨调用状态持久化 │
# │ 监控告警 │ 自行搭建 │ 平台统一监控 │
# └─────────────────────┴──────────────────────────────┴──────────────────────────────┘
@tool
def execute_python(code: str, description: str = "") -> str | None:
"""
在平台沙箱环境中安全执行 Python 代码
本工具使用 AgentArts 平台提供的 code_session 企业级代码执行能力,
实现了完全隔离的安全沙箱环境,可以安全地执行 Agent 生成的代码。
平台沙箱能力:
┌──────────────────────────────────────────────────────────────────────────────┐
│ 安全隔离 │
├──────────────────────────────────────────────────────────────────────────────┤
│ 完全隔离的执行环境,代码无法访问宿主机器资源 │
│ 网络隔离:仅允许特定域名访问(可选配置) │
│ 文件系统隔离:仅能访问临时工作目录 │
│ 禁止危险操作:禁止 subprocess/threading/文件直接访问等 │
├──────────────────────────────────────────────────────────────────────────────┤
│ 资源控制 │
├──────────────────────────────────────────────────────────────────────────────┤
│ CPU 限制:防止无限循环占用资源 │
│ 内存限制:防止内存泄漏导致系统崩溃 │
│ 执行超时:自动终止长时间运行的代码 │
│ 磁盘配额:防止恶意写入大量数据 │
├──────────────────────────────────────────────────────────────────────────────┤
│ 企业级特性 │
├──────────────────────────────────────────────────────────────────────────────┤
│ 执行日志:完整记录代码执行过程 │
│ 审计追溯:记录谁在什么时候执行了什么代码 │
│ 异常捕获:自动捕获并安全处理执行中的异常 │
│ 多区域支持:可选择不同区域的执行环境 │
└──────────────────────────────────────────────────────────────────────────────┘
对比自建沙箱:
自建方案 vs AgentArts 平台方案
┌─────────────────────────────────┬─────────────────────────────────────────┐
│ 需要自行处理: │ 平台全托管: │
│ Docker 容器管理 │ 开箱即用的容器化执行环境 │
│ 资源限制配置 │ 自动 CPU/内存/超时控制 │
│ 安全漏洞修补 │ 平台持续安全更新 │
│ 执行监控告警 │ 统一监控与告警 │
│ 日志收集分析 │ 结构化日志与审计 │
└─────────────────────────────────┴─────────────────────────────────────────┘
使用示例:
# Agent 可以直接生成并执行代码
result = execute_python.invoke("print([x**2 for x in range(10)])")
# 返回: "[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]"
Args:
code: 要执行的 Python 代码
description: 代码的简要描述(可选,用于日志和调试)
Returns:
执行结果的 JSON 字符串
"""
from agentarts.sdk.tools import code_session
if description:
code = f"# {description}\n{code}"
print(f"\n Generated Code: {code}")
# 使用平台提供的 code_session 沙箱执行代码
with code_session(config.region, "your-interpreter-name") as code_client:
response = code_client.invoke(
operate_type="execute_code",
arguments={
"code": code,
"language": "python",
"clearContext": False # 保持执行上下文,允许跨调用共享变量
}
)
print(response)
return json.dumps(response["result"])
@tool
def read_file(file_path: str) -> str:
"""
读取文件内容
Args:
file_path: 文件路径(绝对路径或相对路径)
Returns:
文件内容(最大1MB)
"""
try:
path = Path(file_path)
if not path.exists():
return f"错误: 文件不存在 - {file_path}"
if path.stat().st_size > 1024 * 1024:
return f"错误: 文件过大 (最大支持1MB) - {file_path}"
return path.read_text(encoding='utf-8')
except Exception as e:
return f"错误: 读取文件失败 - {str(e)}"
@tool
def write_file(file_path: str, content: str) -> str:
"""
写入内容到文件(覆盖模式)
Args:
file_path: 文件路径
content: 要写入的内容
Returns:
操作结果
"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding='utf-8')
return f"成功写入文件: {file_path}"
except Exception as e:
return f"错误: 写入文件失败 - {str(e)}"
@tool
def append_file(file_path: str, content: str) -> str:
"""
追加内容到文件末尾
Args:
file_path: 文件路径
content: 要追加的内容
Returns:
操作结果
"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'a', encoding='utf-8') as f:
f.write(content)
return f"成功追加内容到文件: {file_path}"
except Exception as e:
return f"错误: 追加文件失败 - {str(e)}"
@tool
def list_directory(dir_path: str = ".") -> str:
"""
列出目录内容
Args:
dir_path: 目录路径(默认当前目录)
Returns:
目录内容列表
"""
try:
path = Path(dir_path)
if not path.exists():
return f"错误: 目录不存在 - {dir_path}"
if not path.is_dir():
return f"错误: 不是目录 - {dir_path}"
items = []
for item in sorted(path.iterdir()):
item_type = "DIR" if item.is_dir() else "FILE"
size = item.stat().st_size if item.is_file() else 0
items.append(f"{item_type:6} | {size:>10} | {item.name}")
return "类型 | 大小 | 名称\n" + "\n".join(items)
except Exception as e:
return f"错误: 列出目录失败 - {str(e)}"
@tool
def fetch_url(url: str, method: str = "GET", headers: Optional[Dict] = None, body: Optional[str] = None) -> str:
"""
访问 URL 获取内容
Args:
url: 目标URL
method: HTTP方法(GET/POST)
headers: 自定义请求头
body: 请求体(用于POST)
Returns:
响应内容
"""
try:
default_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
if headers:
default_headers.update(headers)
request_kwargs = {"url": url, "headers": default_headers, "timeout": 30}
if method.upper() == "POST":
request_kwargs["data"] = body.encode() if body else None
elif method.upper() != "GET":
return f"错误: 不支持的HTTP方法 - {method}"
response = requests.request(method, **request_kwargs)
if len(response.content) > 1024 * 1024:
return f"响应内容过大 ({len(response.content)} bytes),已截断\n\n" + response.text[:10000]
return f"状态码: {response.status_code}\n\n{response.text}"
except Exception as e:
return f"错误: 访问URL失败 - {str(e)}"
@tool
@require_access_token(
provider_name="your_provider", # AgentArts 平台注册的 GitHub 身份提供商
scopes=["user:email"], # 请求的 OAuth2 权限范围
auth_flow="USER_FEDERATION", # 用户联邦认证流程
ignore_ssl_verification=True,
)
def github_get(url: str, headers: Optional[Dict] = None, access_token: Optional[str] = None) -> str:
"""
访问 GitHub API(平台级身份认证)
本函数使用 AgentArts 平台的 require_access_token 装饰器实现自动身份认证,
无需手动管理 GitHub 访问令牌。
平台能力 vs 传统方式对比:
┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
│ 特性 │ 传统方式 │ AgentArts + require_access_ │
│ │ │ token │
├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
│ 令牌管理 │ 手动创建、存储、轮换 │ 平台自动管理 │
│ 令牌获取 │ 用户自行配置 env 或配置 │ 装饰器自动注入 │
│ 令牌刷新 │ 需要编写刷新逻辑 │ 平台自动处理 │
│ 安全性 │ 令牌暴露在代码或环境变量 │ 令牌存储在平台安全存储 │
│ 用户体验 │ 配置复杂,容易出错 │ 声明式配置,开箱即用 │
└─────────────────────┴──────────────────────────────┴──────────────────────────────┘
认证流程说明:
1. 用户首次调用时,平台自动发起 OAuth2 授权流程
2. 用户在 GitHub 授权页面完成授权
3. 平台获取并安全存储访问令牌
4. 后续调用时,access_token 自动注入到函数参数
5. 令牌过期前,平台自动刷新
Args:
url: GitHub API 路径(如 /repos/owner/repo/issues)
headers: 自定义请求头
access_token: [平台自动注入] OAuth2 访问令牌
Returns:
API 响应内容
"""
# 记录 token 状态
has_token = bool(access_token)
print(f"[GITHUB] access_token: {'已注入' if has_token else '未注入'}")
base_url = "https://api.github.com"
default_headers = {
"User-Agent": "LangGraph-Agent",
"Accept": "application/vnd.github.v3+json",
}
if access_token:
default_headers["Authorization"] = f"token {access_token}"
if headers:
default_headers.update(headers)
# 获取代理配置
proxies = {}
http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
if http_proxy:
proxies["http"] = http_proxy
if https_proxy:
proxies["https"] = https_proxy
try:
full_url = base_url + url if url.startswith("/") else url
# 禁用 SSL 证书验证(仅用于测试)
response = requests.get(full_url, headers=default_headers, timeout=30, proxies=proxies if proxies else None, verify=False)
# 记录响应状态
print(f"[GITHUB] 响应状态: {response.status_code}")
if len(response.content) > 1024 * 1024:
return f"响应内容过大,已截断\n\n{response.text[:10000]}"
return f"状态码: {response.status_code}\n\n{response.text}"
except Exception as e:
print(f"[GITHUB] 请求失败: {str(e)}")
return f"错误: GitHub API请求失败 - {str(e)}"
@tool
def github_get_file(repo: str, file_path: str, ref: str = "main") -> str:
"""
获取 GitHub 仓库中的文件内容
Args:
repo: 仓库名称(如 "owner/repo")
file_path: 文件路径(如 "src/main.py")
ref: 分支或标签名(默认 "main")
Returns:
文件内容
"""
import base64
url = f"/repos/{repo}/contents/{file_path}?ref={ref}"
result = github_get.invoke(url)
if "状态码: 200" in result:
try:
json_start = result.find("{")
json_end = result.rfind("}") + 1
if json_start >= 0 and json_end > json_start:
data = json.loads(result[json_start:json_end])
content = data.get("content", "")
if content:
decoded = base64.b64decode(content).decode("utf-8")
return f"文件: {file_path}\n分支: {ref}\n\n{decoded}"
except Exception as e:
return f"错误: 解析文件内容失败 - {str(e)}\n\n原始响应:\n{result}"
return result
@tool
def github_search(query: str, search_type: str = "repositories") -> str:
"""
搜索 GitHub
Args:
query: 搜索关键词
search_type: 搜索类型(repositories/code/issues/commits)
Returns:
搜索结果
"""
url = f"/search/{search_type}?q={requests.utils.quote(query)}"
return github_get.invoke(url)
@tool
def github_get_issues(repo: str, state: str = "open", per_page: int = 30) -> str:
"""
获取 GitHub 仓库的 Issue 列表
Args:
repo: 仓库名称(如 "langchain-ai/langgraph")
state: Issue 状态(open/closed/all)
per_page: 返回数量(默认30)
Returns:
Issue 列表(JSON 格式)
"""
url = f"/repos/{repo}/issues?state={state}&per_page={per_page}"
return github_get.invoke(url)
@tool
def github_get_issue_comments(repo: str, issue_number: int) -> str:
"""
获取 GitHub 仓库某个 Issue 的评论
Args:
repo: 仓库名称(如 "langchain-ai/langgraph")
issue_number: Issue 编号
Returns:
评论列表(JSON 格式)
"""
url = f"/repos/{repo}/issues/{issue_number}/comments"
return github_get.invoke(url)
@tool
def github_get_pull_requests(repo: str, state: str = "open", per_page: int = 30) -> str:
"""
获取 GitHub 仓库的 Pull Request 列表
Args:
repo: 仓库名称(如 "langchain-ai/langgraph")
state: PR 状态(open/closed/merged/all)
per_page: 返回数量(默认30)
Returns:
PR 列表(JSON 格式)
"""
url = f"/repos/{repo}/pulls?state={state}&per_page={per_page}"
return github_get.invoke(url)
def get_all_tools() -> List:
"""
获取所有可用工具
Returns:
工具列表
"""
tools = [
execute_python,
read_file,
write_file,
append_file,
list_directory,
fetch_url,
github_get,
github_get_file,
github_search,
github_get_issues,
github_get_issue_comments,
github_get_pull_requests,
]
return tools
class AgentState(TypedDict):
"""Agent 状态类型定义"""
messages: List
iteration: int
def should_continue(state: AgentState) -> str:
"""
判断是否继续执行(条件边函数)
如果 LLM 返回了工具调用,则继续执行工具节点
Args:
state: 当前 Agent 状态
Returns:
"continue": 继续执行工具
"end": 结束执行
"""
last_message = state["messages"][-1]
has_tool_calls = hasattr(last_message, "tool_calls") and last_message.tool_calls
return "continue" if has_tool_calls else "end"
def model_node(state: AgentState) -> AgentState:
"""
模型节点:调用 LLM 生成回复
将工具绑定到 LLM,让模型能够决定是否需要调用工具
Args:
state: 当前 Agent 状态
Returns:
更新后的状态(包含 LLM 回复)
"""
tools = get_all_tools()
llm_with_tools = llm_client.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
# ====== 工具调用日志 ======
has_tool_calls = hasattr(response, 'tool_calls') and response.tool_calls
if has_tool_calls:
for tc in response.tool_calls:
print(f"[MODEL] 调用工具: {tc.get('name', 'unknown')}")
# ====== 日志结束 ======
new_messages = state["messages"] + [response]
iteration = state.get("iteration", 0) + 1
return {
"messages": new_messages,
"iteration": iteration,
}
def create_agent_graph(checkpointer=None):
"""
创建 Agent 工作流图
使用 LangGraph 的 StateGraph 构建工作流:
1. 添加 agent 节点(调用 LLM)
2. 添加 tools 节点(执行工具)
3. 设置条件边(根据是否有工具调用决定走向)
4. 编译图并绑定 checkpointer
Args:
checkpointer: 状态持久化器
Returns:
编译后的 LangGraph 图
"""
tools = get_all_tools()
# 自定义 ToolNode - 合并工具返回的消息和原有消息
def tool_node_with_log(state: AgentState) -> AgentState:
"""工具节点:执行工具并合并消息"""
original_messages = list(state["messages"])
result = ToolNode(tools).invoke(state)
# 合并原有消息和工具返回的 ToolMessage
tool_messages = result["messages"]
merged_messages = original_messages + tool_messages
return {
"messages": merged_messages,
"iteration": state.get("iteration", 0),
}
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", model_node)
workflow.add_node("tools", tool_node_with_log)
# 设置入口点
workflow.set_entry_point("agent")
# 添加条件边:检查是否需要调用工具
workflow.add_conditional_edges(
"agent",
should_continue,
{"continue": "tools", "end": END}
)
# 工具执行后返回 agent 节点
workflow.add_edge("tools", "agent")
# 编译图(传入checkpointer)
return workflow.compile(checkpointer=checkpointer)
# ================================================================================
# 第六部分:Agent 主类
# ================================================================================
class LangGraphAgent:
"""
LangGraph Agent 主类
对外提供的核心类,封装了:
- 工作流图的创建和执行
- 会话状态管理
- 对话历史的存取
使用示例:
agent = LangGraphAgent()
response = agent.run("你好", session_id="user-001")
history = agent.get_history(session_id="user-001")
"""
def __init__(self, system_prompt: Optional[str] = None, checkpointer=None):
"""
初始化 Agent
Args:
system_prompt: 系统提示词(可选,默认值见 _default_system_prompt)
checkpointer: 自定义 Checkpointer(可选,默认使用全局单例)
"""
self._checkpointer = checkpointer or get_checkpointer()
self.graph = create_agent_graph(checkpointer=self._checkpointer)
self.system_prompt = system_prompt or self._default_system_prompt()
self._last_session_id: Optional[str] = None
def _default_system_prompt(self) -> str:
"""默认系统提示词"""
return """你是一个智能助手,可以通过调用工具来帮助用户完成任务。
可用的工具:
- execute_python: 执行Python代码
- read_file: 读取本地文件内容
- write_file: 写入本地文件
- append_file: 追加内容到本地文件
- list_directory: 列出本地目录内容
- fetch_url: 访问URL获取网页内容
- github_get: 访问GitHub API
- github_search: 搜索GitHub仓库/代码/issues
- github_get_issues: 获取仓库的Issue列表
- github_get_issue_comments: 获取Issue的评论
- github_get_pull_requests: 获取仓库的PR列表
- github_get_file: 获取仓库中的文件内容
当用户询问GitHub相关问题时(如查看仓库、Issue、PR等),请使用GitHub工具来获取信息。
GitHub仓库是线上的(如 langchain-ai/langgraph),不是本地目录。
"""
def _get_config(self, session_id: str) -> Dict[str, Any]:
"""获取运行时配置"""
return get_thread_config(session_id)
def run(self, user_input: str, session_id: Optional[str] = None) -> str:
"""
运行 Agent 处理用户输入
这是主要入口方法,每次调用都会:
1. 从 Checkpoint 恢复会话状态(如有)
2. 添加用户消息
3. 执行工作流
4. 自动保存状态到 Checkpoint
Args:
user_input: 用户输入的文本
session_id: 会话ID(可选)
- 不传:使用上次会话ID或自动生成新ID
- 传值:使用指定会话ID,实现多会话隔离
Returns:
Agent 的回复文本
"""
# 确定会话ID
if session_id is None:
session_id = self._last_session_id or str(uuid.uuid4())
# 创建会话(如果已存在则忽略,继续执行)
memory_client = MemoryClient()
try:
memory_client.create_memory_session(space_id=config.memory_space, id=session_id)
except Exception as e:
# Session already exists, continue with existing session
if "Session id already exists" in str(e):
pass
else:
raise
self._last_session_id = session_id
thread_config = self._get_config(session_id)
# 从 checkpointer 获取历史消息
checkpoint = self._checkpointer.get(thread_config)
if checkpoint and "channel_values" in checkpoint:
history_messages = list(checkpoint["channel_values"].get("messages", []))
else:
history_messages = []
# 如果有历史消息,添加到现有消息后面(避免重复添加用户消息)
if history_messages:
# 检查最后一条是否是相同的用户消息
if not (history_messages and isinstance(history_messages[-1], HumanMessage) and
history_messages[-1].content == user_input):
messages = history_messages + [HumanMessage(content=user_input)]
else:
messages = history_messages
else:
# 首次对话,添加系统消息
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=user_input)
]
# 执行工作流
result = self.graph.invoke({"messages": messages}, config=thread_config)
# 返回最后一条消息的内容
last_message = result["messages"][-1]
return last_message.content if hasattr(last_message, "content") else str(last_message)
@property
def session_id(self) -> Optional[str]:
"""获取当前会话ID"""
return self._last_session_id