文档首页/ 智能体开发平台 AgentArts/ 高代码开发/ 示例:构建你的首个高码智能体
更新时间:2026-04-15 GMT+08:00
分享

示例:构建你的首个高码智能体

本示例展示了一个基于AgentArts平台的完整Agent实现,充分利用平台提供的企业级能力来构建生产级别的AI Agent。

  1. 本地搭建智能体。

    1. 创建agent.py文件,编辑代码如下:
      示例代码具备基础的模型访问能力、本地记忆以及简单文件处理工具。
      import os
      import uuid
      import json
      import requests
      import urllib3
      from typing import List, Optional, Dict, Any
      from pathlib import Path
      from dotenv import load_dotenv
      # 禁用 SSL 证书警告
      urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
      # ================================================================================
      # 基础依赖:LangGraph + LangChain
      # ================================================================================
      # LangGraph: AI Agent 工作流编排框架
      from langgraph.graph import StateGraph, END
      from langgraph.prebuilt import ToolNode
      # 注:InMemorySaver 作为内存型记忆
      from langgraph.checkpoint.memory import InMemorySaver
      
      # ================================================================================
      # 第一部分:配置模块
      # ================================================================================
      # 从 .env 文件加载环境变量
      load_dotenv()
      class ModelConfig:
          """
          模型配置类
      
          从环境变量读取模型配置:
          - MODEL_NAME: 模型名称(默认 gpt-4)
          - MODEL_URL: 模型API地址(默认 OpenAI 地址)
          - MODEL_API_KEY: 模型API密钥
          - MODEL_TYPE: 模型类型(默认 openai)
          """
      
          def __init__(self):
              self.name = os.getenv("MODEL_NAME", "gpt-4")
              self.url = os.getenv("MODEL_URL", "https://api.openai.com/v1")
              self.api_key = os.getenv("MODEL_API_KEY")
              self.model_type = os.getenv("MODEL_TYPE", "openai")
      class AgentConfig:
          """
          Agent全局配置类
      
          整合所有配置项:
          - model: 模型配置
          - max_iterations: Agent最大迭代次数(防止无限循环)
          - memory_space: AgentArts 记忆空间ID
          """
      
          def __init__(self):
              self.model = ModelConfig()
              self.max_iterations = int(os.getenv("MAX_ITERATIONS", "50"))
              self.memory_space = os.getenv("MEMORY_SPACE_ID", "default")
              self.region = os.getenv("HUAWEICLOUD_SDK_REGION", "cn-north-4")
      # 全局配置实例(单例模式)
      config = AgentConfig()
      
      _global_checkpointer: Optional[InMemorySaver] = None
      
      
      def get_checkpointer() -> InMemorySaver:
        
          global _global_checkpointer
          if _global_checkpointer is None:
              _global_checkpointer = InMemorySaver()
          return _global_checkpointer
      def get_thread_config(session_id: str) -> Dict[str, Any]:
          """
          获取 LangGraph 运行时配置
      
          LangGraph 通过 config 中的 thread_id 来区分不同的会话
      
          Args:
              session_id: 会话ID(对应 LangGraph 的 thread_id)
      
          Returns:
              LangGraph 运行时配置字典
          """
          return {"configurable": {"thread_id": session_id}}
      # ================================================================================
      # 第三部分:LLM 客户端
      # ================================================================================
      
      class LLMClient:
          """
          LLM 客户端封装类
      
          封装 LangChain 的 ChatOpenAI,提供:
          - 模型调用能力
          - 工具绑定能力(让模型能够调用工具)
          """
      
          def __init__(self):
              self._client = self._create_client()
          def _create_client(self):
              """创建 OpenAI 兼容的 Chat 客户端"""
              model_cfg = config.model
              return ChatOpenAI(
                  model=model_cfg.name,
                  base_url=model_cfg.url,
                  api_key=model_cfg.api_key,
                  temperature=0,  # 设为0以获得更确定性的输出
                  streaming=False,
              )
          def invoke(self, messages: List[BaseMessage]) -> BaseMessage:
              """调用 LLM 生成回复"""
              return self._client.invoke(messages)
          def bind_tools(self, tools: List[Any]):
              """绑定工具到 LLM,使模型能够调用这些工具"""
              return self._client.bind_tools(tools)
      # 全局 LLM 客户端实例
      llm_client = LLMClient()
      
      # ================================================================================
      # 第四部分:工具定义
      # ================================================================================
      # 工具说明:
      # - 使用 @tool 装饰器将函数定义为 LangChain 工具
      # - 工具函数必须有清晰的文档字符串,模型会根据描述决定是否调用
      # - 工具返回值必须是字符串(会被返回给模型作为上下文)
      @tool
      def read_file(file_path: str) -> str:
          """
          读取文件内容
      
          Args:
              file_path: 文件路径(绝对路径或相对路径)
      
          Returns:
              文件内容(最大1MB)
          """
          try:
              path = Path(file_path)
              if not path.exists():
                  return f"错误: 文件不存在 - {file_path}"
              if path.stat().st_size > 1024 * 1024:
                  return f"错误: 文件过大 (最大支持1MB) - {file_path}"
              return path.read_text(encoding='utf-8')
          except Exception as e:
              return f"错误: 读取文件失败 - {str(e)}"
      
      
      @tool
      def write_file(file_path: str, content: str) -> str:
          """
          写入内容到文件(覆盖模式)
      
          Args:
              file_path: 文件路径
              content: 要写入的内容
      
          Returns:
              操作结果
          """
          try:
              path = Path(file_path)
              path.parent.mkdir(parents=True, exist_ok=True)
              path.write_text(content, encoding='utf-8')
              return f"成功写入文件: {file_path}"
          except Exception as e:
              return f"错误: 写入文件失败 - {str(e)}"
      
      
      @tool
      def append_file(file_path: str, content: str) -> str:
          """
          追加内容到文件末尾
      
          Args:
              file_path: 文件路径
              content: 要追加的内容
      
          Returns:
              操作结果
          """
          try:
              path = Path(file_path)
              path.parent.mkdir(parents=True, exist_ok=True)
              with open(path, 'a', encoding='utf-8') as f:
                  f.write(content)
              return f"成功追加内容到文件: {file_path}"
          except Exception as e:
              return f"错误: 追加文件失败 - {str(e)}"
      
      
      @tool
      def list_directory(dir_path: str = ".") -> str:
          """
          列出目录内容
      
          Args:
              dir_path: 目录路径(默认当前目录)
      
          Returns:
              目录内容列表
          """
          try:
              path = Path(dir_path)
              if not path.exists():
                  return f"错误: 目录不存在 - {dir_path}"
              if not path.is_dir():
                  return f"错误: 不是目录 - {dir_path}"
      
              items = []
              for item in sorted(path.iterdir()):
                  item_type = "DIR" if item.is_dir() else "FILE"
                  size = item.stat().st_size if item.is_file() else 0
                  items.append(f"{item_type:6} | {size:>10} | {item.name}")
              return "类型      | 大小      | 名称\n" + "\n".join(items)
          except Exception as e:
              return f"错误: 列出目录失败 - {str(e)}"
      
      # ================================================================================
      # 第六部分:Agent 主类
      # ================================================================================
      class AgentState(TypedDict):
          """Agent 状态类型定义"""
          messages: List
          iteration: int
      
      class LangGraphAgent:
          """
          LangGraph Agent 主类
      
          对外提供的核心类,封装了:
          - 工作流图的创建和执行
          - 会话状态管理
          - 对话历史的存取
      
          使用示例:
              agent = LangGraphAgent()
              response = agent.run("你好", session_id="user-001")
              history = agent.get_history(session_id="user-001")
          """
      
          def __init__(self, system_prompt: Optional[str] = None, checkpointer=None):
              """
              初始化 Agent
      
              Args:
                  system_prompt: 系统提示词(可选,默认值见 _default_system_prompt)
                  checkpointer: 自定义 Checkpointer(可选,默认使用全局单例)
              """
              self._checkpointer = checkpointer or get_checkpointer()
              self.graph = create_agent_graph(checkpointer=self._checkpointer)
              self.system_prompt = system_prompt or self._default_system_prompt()
              self._last_session_id: Optional[str] = None
      
          def _default_system_prompt(self) -> str:
              """默认系统提示词"""
              return """你是一个智能助手,可以通过调用工具来帮助用户完成任务。
      
      可用的工具:
      - read_file: 读取本地文件内容
      - write_file: 写入本地文件
      - append_file: 追加内容到本地文件
      - list_directory: 列出本地目录内容
      """
      
          def _get_config(self, session_id: str) -> Dict[str, Any]:
              """获取运行时配置"""
              return get_thread_config(session_id)
          def run(self, user_input: str, session_id: Optional[str] = None) -> str:
              """
              运行 Agent 处理用户输入
      
              这是主要入口方法,每次调用都会:
              1. 从 Checkpoint 恢复会话状态(如有)
              2. 添加用户消息
              3. 执行工作流
              4. 自动保存状态到 Checkpoint
      
              Args:
                  user_input: 用户输入的文本
                  session_id: 会话ID(可选)
                      - 不传:使用上次会话ID或自动生成新ID
                      - 传值:使用指定会话ID,实现多会话隔离
      
              Returns:
                  Agent 的回复文本
              """
              # 确定会话ID
              if session_id is None:
                  session_id = self._last_session_id or str(uuid.uuid4())
              # 创建会话(如果已存在则忽略,继续执行)
              memory_client = MemoryClient()
              try:
                  memory_client.create_memory_session(space_id=config.memory_space, id=session_id)
              except Exception as e:
                  # Session already exists, continue with existing session
                  if "Session id already exists" in str(e):
                      pass
                  else:
                      raise
      
              self._last_session_id = session_id
              thread_config = self._get_config(session_id)
              # 从 checkpointer 获取历史消息
              checkpoint = self._checkpointer.get(thread_config)
              if checkpoint and "channel_values" in checkpoint:
                  history_messages = list(checkpoint["channel_values"].get("messages", []))
              else:
                  history_messages = []
              # 如果有历史消息,添加到现有消息后面(避免重复添加用户消息)
              if history_messages:
                  # 检查最后一条是否是相同的用户消息
                  if not (history_messages and isinstance(history_messages[-1], HumanMessage) and
                          history_messages[-1].content == user_input):
                      messages = history_messages + [HumanMessage(content=user_input)]
                  else:
                      messages = history_messages
              else:
                  # 首次对话,添加系统消息
                  messages = [
                      SystemMessage(content=self.system_prompt),
                      HumanMessage(content=user_input)
                  ]
              # 执行工作流
              result = self.graph.invoke({"messages": messages}, config=thread_config)
              # 返回最后一条消息的内容
              last_message = result["messages"][-1]
              return last_message.content if hasattr(last_message, "content") else str(last_message)
          @property
          def session_id(self) -> Optional[str]:
              """获取当前会话ID"""
              return self._last_session_id
      
    2. 创建main.py文件简单测试agent响应。
      ""工具使用示例 - 展示如何让Agent使用工具"""
      from agent import LangGraphAgent
      def main():
          # 创建Agent
          agent = LangGraphAgent()
          print("=" * 50)
          print("示例1: 读取文件")
          print("=" * 50)
          # 让Agent读取当前目录
          response = agent.run("请列出当前目录的文件名,有哪几个文件")
          print(f"Agent: {response}")
          print("=" * 50)
          print("示例2: 查看历史记录")
          print("=" * 50)
          # 让Agent读取当前目录
          response = agent.run("请列上一个问题")
          print(f"Agent: {response}")
      
      if __name__ == "__main__":
          main()
    3. 创建.env文件配置如下模型相关环境变量
      # 模型配置
      MODEL_NAME=your_model_name
      MODEL_URL=your_model_url
      MODEL_API_KEY=your_model_api_key
      
      # Agent配置
      MAX_ITERATIONS=50
      SESSION_TTL=3600
    4. 运行命令python -m main.py执行。

  2. 集成组件库增强能力。

    1. 对接AgentArts runtime,使用SDK封装成http server。创建app.py文件,代码参考如下:
      """
      ================================================================================
      AgentArts 平台部署入口 - 快速将 Agent 部署为 HTTP 服务
      ================================================================================
      
      本文件展示了如何使用 AgentArts 平台的 @entrypoint 注解,
      只需编写业务逻辑,即可快速将 Agent 部署为可调用的 HTTP 服务。
      
      平台部署能力:
      1. 零配置部署 - 只需编写业务逻辑,自动生成 HTTP 接口
      2. 自动请求解析 - 平台自动将 JSON 请求转换为 payload
      3. 内置会话管理 - 自动处理 session_id,支持多用户隔离
      4. 全平台能力集成 - 自动继承记忆、身份认证、工具等平台能力
      5. 水平扩展 - 支持多副本部署,自动负载均衡
      6. 多协议支持 - HTTP REST + WebSocket 双通道
      ================================================================================
      """
      from agentarts.sdk import AgentArtsRuntimeApp, RequestContext
      # 导入我们编写的 LangGraph Agent
      from agent import LangGraphAgent
      # ================================================================================
      # 第一步:创建平台应用实例
      # ================================================================================
      # AgentArtsRuntimeApp: AgentArts 平台核心应用类
      # - 自动启动 HTTP 服务
      # - 自动处理请求路由
      # - 自动集成平台能力(记忆、认证、工具等)
      app = AgentArtsRuntimeApp()
      # 创建 Agent 实例(所有请求共享此实例)
      # 平台会自动处理并发和会话隔离
      myagent = LangGraphAgent()
      # ================================================================================
      # 第二步:定义入口函数(核心业务逻辑)
      # ================================================================================
      # @app.entrypoint: 声明式入口点装饰器
      #
      # 工作原理:
      # 1. 平台接收 HTTP 请求
      # 2. 自动解析请求体为 payload 字典
      # 3. 自动提取 session_id 用于会话隔离
      # 4. 调用被装饰的函数,传入 payload 和 context
      # 5. 函数的返回值自动序列化为 JSON 响应
      #
      # 参数说明:
      # - payload: 请求体解析后的字典,包含用户传入的参数
      # - context: 请求上下文,包含 session_id、用户信息等
      #
      # 平台自动处理:
      #  HTTP 请求解析
      #  JSON 序列化/反序列化
      #  会话 ID 提取与管理
      #  异常捕获与错误返回
      #  请求日志与监控
      
      @app.entrypoint
      def my_agent(payload, context: RequestContext):
          """
          Agent 入口处理函数
      
          只需编写业务逻辑,平台负责其余一切:
          - 请求解析
          - 会话管理
          - 响应封装
          - 错误处理
          - 监控告警
      
          Args:
              payload: 请求参数字典
                  - prompt: 用户输入(支持多种参数名)
                  - message: 用户输入(备选参数名)
                  - session_id: 会话ID(可选,平台自动管理)
                  - 其他自定义参数...
      
              context: 请求上下文(平台注入)
                  - context.session_id: 当前会话ID
                  - context.request_id: 请求追踪ID
                  - ...
      
          Returns:
              dict: 响应内容(自动序列化为 JSON)
                  - response: Agent 回复内容
                  - status: 执行状态 (success/error)
                  - 其他自定义字段...
          """
          # ================================================================================
          # 第三步:编写业务逻辑
          # ================================================================================
          # 参数获取(平台已自动解析)
          prompt = payload.get("prompt", "")
          message = payload.get("message", "")
          # session_id 由平台自动管理,无需手动处理
          session_id = context.session_id
          # 统一用户输入(支持多种参数名)
          user_input = prompt or message
          # 调用 Agent 处理(平台能力自动生效)
          # - 会话记忆:自动恢复历史上下文
          # - 工具调用:自动注入访问令牌
          # - 代码执行:自动使用安全沙箱
          result = myagent.run(user_input, session_id=session_id)
          # ================================================================================
          # 第四步:返回响应(平台自动封装)
          # ================================================================================
          # 只需返回字典,平台自动处理:
          # - JSON 序列化
          # - HTTP 响应头
          # - 跨域处理
          # - 错误码映射
          return {
              "response": result,
              "status": "success"
          }
      # ================================================================================
      # 第五步:启动服务
      # ================================================================================
      # 本地开发模式
      # 运行: python app.py
      # 服务启动后,访问 http://localhost:8080 查看 API 文档
      if __name__ == "__main__":
          # 平台自动启动 HTTP 服务器
          # 默认端口: 8080
          # 可通过环境变量配置: AGENT_RUN_PORT
          app.run(port=8080)

      执行python app.py启动http serve,执行以下命令调用:

      curl --location --request POST 'http://localhost:8080/invocations' \
      --header 'Content-Type: application/json'\
      --data-raw '{"message": "请列出当前目录的文件名,有哪几个文件"}'
    2. 对接AgentArts memory组件,参考代码如下:
      # - AgentArtsMemorySessionSaver: 企业级会话状态持久化
      # - 替代标准 InMemorySaver,提供生产级能力
      from agentarts.sdk.integration.langgraph import AgentArtsMemorySessionSaver
      
      # ================================================================================
      # 第二部分:平台级记忆系统 (AgentArts Checkpoint)
      # ================================================================================
      # AgentArts 平台提供的企业级会话记忆能力
      #
      # 与标准 LangGraph InMemorySaver 的区别:
      # ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
      # │ 特性                │ InMemorySaver (标准)         │ AgentArtsMemorySessionSaver  │
      # ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
      # │ 存储位置            │ 进程内存                      │ 平台持久化存储               │
      # │ 多实例共享          │ 不支持                        │ 支持(跨实例共享)           │
      # │ 会话隔离            │ 基础隔离                      │ 企业级租户隔离               │
      # │ 状态恢复            │ 仅当前进程                    │ 跨应用、跨会话恢复           │
      # │ 数据安全            │ 重启丢失                      │ 企业级数据保护               │
      # │ 扩展性              │ 单机                          │ 分布式集群支持               │
      # └─────────────────────┴──────────────────────────────┴──────────────────────────────┘
      #
      # 核心能力:
      # - space_id: 记忆空间标识,支持多租户隔离
      # - region: 区域配置,支持跨区域部署
      # - 自动状态持久化:每次 Agent 执行后自动保存状态
      # - 快速状态恢复:从持久化存储中恢复会话上下文
      
      _global_checkpointer: Optional[AgentArtsMemorySessionSaver] = None
      
      
      def get_checkpointer() -> AgentArtsMemorySessionSaver:
          """
          获取平台级 Checkpointer 实例(单例模式)
      
          使用 AgentArtsMemorySessionSaver 替代标准的 InMemorySaver,
          实现企业级的会话状态持久化能力。
      
          平台优势:
              - 多实例共享:支持 Agent 部署多副本时共享会话状态
              - 租户隔离:通过 space_id 实现企业级数据隔离
              - 状态恢复:服务重启后自动恢复会话上下文
              - 零运维:无需自行搭建 Redis 等存储服务
      
          Returns:
              AgentArtsMemorySessionSaver: AgentArts 平台级记忆存储
          """
          global _global_checkpointer
          if _global_checkpointer is None:
              _global_checkpointer = AgentArtsMemorySessionSaver(space_id="your_space_id", region="your_space_region")
          return _global_checkpointer
    3. 使用云上内置代码解释器工具,参考代码如下:
      @tool
      def execute_python(code: str, description: str = "") -> str | None:
          """
          在平台沙箱环境中安全执行 Python 代码
      
          本工具使用 AgentArts 平台提供的 code_session 企业级代码执行能力,
          实现了完全隔离的安全沙箱环境,可以安全地执行 Agent 生成的代码。
      
          平台沙箱能力:
          ┌─────────────────────────────────────┐
          │ 安全隔离                                                                  │
          ├──────────────────────────────────────┤
          │   完全隔离的执行环境,代码无法访问宿主机器资源                              │
          │   网络隔离:仅允许特定域名访问(可选配置)                                 │
          │   文件系统隔离:仅能访问临时工作目录                                       │
          │   禁止危险操作:禁止 subprocess/threading/文件直接访问等                   │
          ├──────────────────────────────────────┤
          │ 资源控制                                                                  │
          ├──────────────────────────────────────┤
          │   CPU 限制:防止无限循环占用资源                                           │
          │   内存限制:防止内存泄漏导致系统崩溃                                       │
          │   执行超时:自动终止长时间运行的代码                                       │
          │   磁盘配额:防止恶意写入大量数据                                           │
          ├──────────────────────────────────────┤
          │ 企业级特性                                                                │
          ├──────────────────────────────────────┤
          │   执行日志:完整记录代码执行过程                                           │
          │   审计追溯:记录谁在什么时候执行了什么代码                                 │
          │   异常捕获:自动捕获并安全处理执行中的异常                                 │
          │   多区域支持:可选择不同区域的执行环境                                     │
          └──────────────────────────────────────┘
      
          对比自建沙箱:
                          自建方案                    vs    AgentArts 平台方案
          ┌─────────────────────────────────┬─────┐
          │ 需要自行处理:                   │ 平台全托管:                            │
          │   Docker 容器管理                │   开箱即用的容器化执行环境              │
          │   资源限制配置                   │   自动 CPU/内存/超时控制                │
          │   安全漏洞修补                   │   平台持续安全更新                      │
          │   执行监控告警                   │   统一监控与告警                        │
          │   日志收集分析                   │   结构化日志与审计                      │
          └─────────────────────────────────┴─────┘
      
          使用示例:
              # Agent 可以直接生成并执行代码
              result = execute_python.invoke("print([x**2 for x in range(10)])")
              # 返回: "[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]"
      
          Args:
              code: 要执行的 Python 代码
              description: 代码的简要描述(可选,用于日志和调试)
      
          Returns:
              执行结果的 JSON 字符串
          """
          from agentarts.sdk.tools import code_session
          if description:
              code = f"# {description}\n{code}"
      
          print(f"\n Generated Code: {code}")
          # 使用平台提供的 code_session 沙箱执行代码
          with code_session("your_interpreter_region", "your_interpre_name") as code_client:
              response = code_client.invoke(
                  operate_type="execute_code",
                  arguments={
                      "code": code,
                      "language": "python",
                      "clearContext": False  # 保持执行上下文,允许跨调用共享变量
                  }
              )
              print(response)
          return json.dumps(response["result"])
    4. 使用云上identity获取出站认证凭据,参考如下代码示例:
      @tool
      @require_access_token(
          provider_name="your_provider_name",  # Agent Identity 平台注册的 GitHub 身份提供商
          scopes=["user:email"],              # 请求的 OAuth2 权限范围
          auth_flow="USER_FEDERATION",         # 用户联邦认证流程
      ignore_ssl_verification=True,
      )
      def github_get(url: str, headers: Optional[Dict] = None, access_token: Optional[str] = None) -> str:
          """
          访问 GitHub API(平台级身份认证)
      
          本函数使用 AgentArts 平台的 require_access_token 装饰器实现自动身份认证,
          无需手动管理 GitHub 访问令牌。
      
          平台能力 vs 传统方式对比:
          ┌─────────────────────┬────────────────────┐
          │ 特性                │ 传统方式                      │ AgentArts + require_access_ │
          │                     │                              │ token                       │
          ├─────────────────────┼────────────────────┤
          │ 令牌管理            │ 手动创建、存储、轮换          │ 平台自动管理                 │
          │ 令牌获取            │ 用户自行配置 env 或配置       │ 装饰器自动注入               │
          │ 令牌刷新            │ 需要编写刷新逻辑              │ 平台自动处理                 │
          │ 安全性              │ 令牌暴露在代码或环境变量      │ 令牌存储在平台安全存储       │
          │ 用户体验            │ 配置复杂,容易出错            │ 声明式配置,开箱即用         │
          └─────────────────────┴──────────────────────┘
      
          认证流程说明:
              1. 用户首次调用时,平台自动发起 OAuth2 授权流程
              2. 用户在 GitHub 授权页面完成授权
              3. 平台获取并安全存储访问令牌
              4. 后续调用时,access_token 自动注入到函数参数
              5. 令牌过期前,平台自动刷新
      
          Args:
              url: GitHub API 路径(如 /repos/owner/repo/issues)
              headers: 自定义请求头
              access_token: [平台自动注入] OAuth2 访问令牌
      
          Returns:
              API 响应内容
          """
          # 记录 token 状态
          has_token = bool(access_token)
          print(f"[GITHUB] access_token: {'已注入' if has_token else '未注入'}")
          base_url = "https://api.github.com"
      
          default_headers = {
              "User-Agent": "LangGraph-Agent",
              "Accept": "application/vnd.github.v3+json",
          }
          if access_token:
              default_headers["Authorization"] = f"token {access_token}"
          if headers:
              default_headers.update(headers)
          # 获取代理配置
          proxies = {}
          http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
          https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
          if http_proxy:
              proxies["http"] = http_proxy
          if https_proxy:
              proxies["https"] = https_proxy
          try:
              full_url = base_url + url if url.startswith("/") else url
              # 禁用 SSL 证书验证(仅用于测试)
              response = requests.get(full_url, headers=default_headers, timeout=30, proxies=proxies if proxies else None, verify=False)
              # 记录响应状态
              print(f"[GITHUB] 响应状态: {response.status_code}")
              if len(response.content) > 1024 * 1024:
                  return f"响应内容过大,已截断\n\n{response.text[:10000]}"
      
              return f"状态码: {response.status_code}\n\n{response.text}"
          except Exception as e:
              print(f"[GITHUB] 请求失败: {str(e)}")
              return f"错误: GitHub API请求失败 - {str(e)}"

  3. 部署智能体运行时。

    按上述步骤完成后,基本代码开发已经完成,接下来准备部署到平台。

    首先准备依赖文件requirements.txt内容可参考如下:

    # ================================================================================
    # AgentArts LangGraph Agent Demo - 依赖清单
    # ================================================================================
    #
    # 本项目基于 AgentArts 平台,使用 LangGraph 框架构建 AI Agent
    # 依赖分为两部分:基础框架依赖 + 平台 SDK 依赖
    #
    # 安装方式:
    #   pip install -r requirements.txt
    #
    # ================================================================================
    
    
    # ================================================================================
    # 第一部分:LangGraph & LangChain 核心框架
    # ================================================================================
    # LangGraph: AI Agent 工作流编排框架
    # - 状态图定义与执行
    # - Checkpoint 持久化机制
    # - 条件边与节点路由
    langgraph>=0.2.0
    
    
    # LangChain: LLM 应用开发工具链
    # - 消息类型定义 (HumanMessage, AIMessage, SystemMessage)
    # - 工具系统 (@tool 装饰器)
    # - LLM 客户端封装
    langchain>=0.3.0
    langchain-core>=0.3.0
    langchain-community>=0.3.0
    
    
    # ================================================================================
    # 第二部分:LLM Provider 支持
    # ================================================================================
    # OpenAI 兼容接口
    openai>=1.0.0
    langchain-openai>=0.1.0
    
    
    # Anthropic (Claude) 支持
    anthropic>=0.18.0
    langchain-anthropic>=0.1.0
    
    
    # ================================================================================
    # 第三部分:HTTP & 网络
    # ================================================================================
    requests>=2.31.0
    httpx>=0.27.0
    
    
    # ================================================================================
    # 第四部分:工具与配置
    # ================================================================================
    # 环境变量管理
    python-dotenv>=1.0.0
    
    
    # 异步支持
    aiofiles>=23.0.0
    
    
    # JSON/YAML 支持
    pyyaml>=6.0
    
    
    # ================================================================================
    # 第五部分:AgentArts 平台 SDK(核心依赖) todo 待定
    # ================================================================================
    # 
    #
    # ================================================================================

    执行命令配置智能体

    agentarts configure --entrypoint app:app

    执行命令部署智能体

    agentarts launch

完整agent.py代码样例可参考如下:

核心平台能力:
1. 记忆系统 (AgentArtsMemorySessionSaver)
   - 基于 LangGraph Checkpoint 的会话状态持久化
   - 支持跨会话、跨应用的状态恢复
   - 企业级数据安全与隔离

2. 身份认证 (AgentArtsIdentity + require_access_token)
   - 自动 OAuth2 凭据获取与管理
   - 支持多种第三方平台集成(GitHub/GitLab等)
   - 无需手动管理令牌,平台自动处理刷新

3. 安全代码执行 (AgentArts Code Sandbox)
   - 企业级沙箱环境,完全隔离
   - 自动资源控制(CPU/内存/超时)
   - 危险操作自动拦截
   - 完整执行日志与审计追溯

4. 工具生态
   - 文件操作、URL访问
   - GitHub API 深度集成(自动获取访问令牌)
   - 平台级代码执行沙箱
   - 可扩展的工具注册机制

平台优势:
    - 开箱即用的企业级记忆能力
    - 简化的第三方服务认证流程
    - 安全的代码执行沙箱
    - 完善的会话隔离与安全机制
================================================================================
"""

import os
import uuid
import json
import requests
import urllib3
from typing import List, Optional, Dict, Any
from pathlib import Path
from dotenv import load_dotenv
from langchain.agents import AgentState
# 禁用 SSL 证书警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ================================================================================
# 基础依赖:LangGraph + LangChain
# ================================================================================
# LangGraph: AI Agent 工作流编排框架
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
# 注:InMemorySaver 仅保留作为参考,生产环境使用 AgentArtsMemorySessionSaver
# from langgraph.checkpoint.memory import InMemorySaver

# LangChain: LLM 应用开发工具链
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
# ================================================================================
# AgentArts 平台能力依赖
# ================================================================================
# AgentArtsIdentity: 平台级身份认证服务
# - require_access_token: 声明式 OAuth2 凭据注入装饰器
# - 自动处理令牌获取、刷新、存储
from agentarts.sdk.identity.auth import require_access_token
# AgentArtsMemory: 
# - AgentArtsMemorySessionSaver: 企业级会话状态持久化,集成 LangGraph 框架
# - 替代标准 InMemorySaver,提供生产级能力
from agentarts.sdk.integration.langgraph import AgentArtsMemorySessionSaver
from agentarts.sdk.memory import MemoryClient
# AgentArts Runtiem: 核心启动类
from agentarts.sdk import AgentArtsRuntimeApp, RequestContext
# ================================================================================
# 第一部分:配置模块
# ================================================================================
# 从 .env 文件加载环境变量
load_dotenv()
class ModelConfig:
    """
    模型配置类

    从环境变量读取模型配置:
    - MODEL_NAME: 模型名称(默认 gpt-4)
    - MODEL_URL: 模型API地址(默认 OpenAI 地址)
    - MODEL_API_KEY: 模型API密钥
    - MODEL_TYPE: 模型类型(默认 openai)
    """

    def __init__(self):
        self.name = os.getenv("MODEL_NAME", "gpt-4")
        self.url = os.getenv("MODEL_URL", "https://api.openai.com/v1")
        self.api_key = os.getenv("MODEL_API_KEY")
        self.model_type = os.getenv("MODEL_TYPE", "openai")
class AgentConfig:
    """
    Agent全局配置类

    整合所有配置项:
    - model: 模型配置
    - max_iterations: Agent最大迭代次数(防止无限循环)
    - memory_space: AgentArts 记忆空间ID
    """

    def __init__(self):
        self.model = ModelConfig()
        self.max_iterations = int(os.getenv("MAX_ITERATIONS", "50"))
        self.memory_space = os.getenv("MEMORY_SPACE_ID", "default")
        self.region = os.getenv("HUAWEICLOUD_SDK_REGION", "cn-north-4")
# 全局配置实例(单例模式)
config = AgentConfig()
# ================================================================================
# 第二部分:平台级记忆系统 (AgentArts Checkpoint)
# ================================================================================
# AgentArts 平台提供的企业级会话记忆能力
#
# 与标准 LangGraph InMemorySaver 的区别:
# ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
# │ 特性                │ InMemorySaver (标准)         │ AgentArtsMemorySessionSaver  │
# ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
# │ 存储位置            │ 进程内存                      │ 平台持久化存储               │
# │ 多实例共享          │ 不支持                        │ 支持(跨实例共享)           │
# │ 会话隔离            │ 基础隔离                      │ 企业级租户隔离               │
# │ 状态恢复            │ 仅当前进程                    │ 跨应用、跨会话恢复           │
# │ 数据安全            │ 重启丢失                      │ 企业级数据保护               │
# │ 扩展性              │ 单机                          │ 分布式集群支持               │
# └─────────────────────┴──────────────────────────────┴──────────────────────────────┘
#
# 核心能力:
# - space_id: 记忆空间标识,支持多租户隔离
# - region: 区域配置,支持跨区域部署
# - 自动状态持久化:每次 Agent 执行后自动保存状态
# - 快速状态恢复:从持久化存储中恢复会话上下文

_global_checkpointer: Optional[AgentArtsMemorySessionSaver] = None


def get_checkpointer() -> AgentArtsMemorySessionSaver:
    """
    获取平台级 Checkpointer 实例(单例模式)

    使用 AgentArtsMemorySessionSaver 替代标准的 InMemorySaver,
    实现企业级的会话状态持久化能力。

    平台优势:
        - 多实例共享:支持 Agent 部署多副本时共享会话状态
        - 租户隔离:通过 space_id 实现企业级数据隔离
        - 状态恢复:服务重启后自动恢复会话上下文
        - 零运维:无需自行搭建 Redis 等存储服务

    Returns:
        AgentArtsMemorySessionSaver: AgentArts 平台级记忆存储
    """
    global _global_checkpointer
    if _global_checkpointer is None:
        _global_checkpointer = AgentArtsMemorySessionSaver(space_id=config.memory_space, region=config.region)
    return _global_checkpointer
def get_thread_config(session_id: str) -> Dict[str, Any]:
    """
    获取 LangGraph 运行时配置

    LangGraph 通过 config 中的 thread_id 来区分不同的会话

    Args:
        session_id: 会话ID(对应 LangGraph 的 thread_id)

    Returns:
        LangGraph 运行时配置字典
    """
    return {"configurable": {"thread_id": session_id}}
# ================================================================================
# 第三部分:LLM 客户端
# ================================================================================

class LLMClient:
    """
    LLM 客户端封装类

    封装 LangChain 的 ChatOpenAI,提供:
    - 模型调用能力
    - 工具绑定能力(让模型能够调用工具)
    """

    def __init__(self):
        self._client = self._create_client()
    def _create_client(self):
        """创建 OpenAI 兼容的 Chat 客户端"""
        model_cfg = config.model
        return ChatOpenAI(
            model=model_cfg.name,
            base_url=model_cfg.url,
            api_key=model_cfg.api_key,
            temperature=0,  # 设为0以获得更确定性的输出
            streaming=False,
        )
    def invoke(self, messages: List[BaseMessage]) -> BaseMessage:
        """调用 LLM 生成回复"""
        return self._client.invoke(messages)
    def bind_tools(self, tools: List[Any]):
        """绑定工具到 LLM,使模型能够调用这些工具"""
        return self._client.bind_tools(tools)
# 全局 LLM 客户端实例
llm_client = LLMClient()
# ================================================================================
# 第四部分:工具定义
# ================================================================================
# 工具说明:
# - 使用 @tool 装饰器将函数定义为 LangChain 工具
# - 工具函数必须有清晰的文档字符串,模型会根据描述决定是否调用
# - 工具返回值必须是字符串(会被返回给模型作为上下文)
#
# 平台工具 vs 自建工具:
# ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
# │ 特性                │ 自建工具                      │ AgentArts 平台工具           │
# ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
# │ 代码执行            │ 自行搭建沙箱                  │ 企业级安全沙箱               │
# │ 安全性              │ 需要自行处理恶意代码          │ 自动隔离危险操作             │
# │ 资源限制            │ 自行实现                      │ 自动 CPU/内存/超时控制       │
# │ 状态管理            │ 自行处理会话状态              │ 跨调用状态持久化             │
# │ 监控告警            │ 自行搭建                      │ 平台统一监控                 │
# └─────────────────────┴──────────────────────────────┴──────────────────────────────┘


@tool
def execute_python(code: str, description: str = "") -> str | None:
    """
    在平台沙箱环境中安全执行 Python 代码

    本工具使用 AgentArts 平台提供的 code_session 企业级代码执行能力,
    实现了完全隔离的安全沙箱环境,可以安全地执行 Agent 生成的代码。

    平台沙箱能力:
    ┌──────────────────────────────────────────────────────────────────────────────┐
    │ 安全隔离                                                                  │
    ├──────────────────────────────────────────────────────────────────────────────┤
    │   完全隔离的执行环境,代码无法访问宿主机器资源                              │
    │   网络隔离:仅允许特定域名访问(可选配置)                                 │
    │   文件系统隔离:仅能访问临时工作目录                                       │
    │   禁止危险操作:禁止 subprocess/threading/文件直接访问等                   │
    ├──────────────────────────────────────────────────────────────────────────────┤
    │ 资源控制                                                                  │
    ├──────────────────────────────────────────────────────────────────────────────┤
    │   CPU 限制:防止无限循环占用资源                                           │
    │   内存限制:防止内存泄漏导致系统崩溃                                       │
    │   执行超时:自动终止长时间运行的代码                                       │
    │   磁盘配额:防止恶意写入大量数据                                           │
    ├──────────────────────────────────────────────────────────────────────────────┤
    │ 企业级特性                                                                │
    ├──────────────────────────────────────────────────────────────────────────────┤
    │   执行日志:完整记录代码执行过程                                           │
    │   审计追溯:记录谁在什么时候执行了什么代码                                 │
    │   异常捕获:自动捕获并安全处理执行中的异常                                 │
    │   多区域支持:可选择不同区域的执行环境                                     │
    └──────────────────────────────────────────────────────────────────────────────┘

    对比自建沙箱:
                    自建方案                    vs    AgentArts 平台方案
    ┌─────────────────────────────────┬─────────────────────────────────────────┐
    │ 需要自行处理:                   │ 平台全托管:                            │
    │   Docker 容器管理                │   开箱即用的容器化执行环境              │
    │   资源限制配置                   │   自动 CPU/内存/超时控制                │
    │   安全漏洞修补                   │   平台持续安全更新                      │
    │   执行监控告警                   │   统一监控与告警                        │
    │   日志收集分析                   │   结构化日志与审计                      │
    └─────────────────────────────────┴─────────────────────────────────────────┘

    使用示例:
        # Agent 可以直接生成并执行代码
        result = execute_python.invoke("print([x**2 for x in range(10)])")
        # 返回: "[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]"

    Args:
        code: 要执行的 Python 代码
        description: 代码的简要描述(可选,用于日志和调试)

    Returns:
        执行结果的 JSON 字符串
    """
    from agentarts.sdk.tools import code_session
    if description:
        code = f"# {description}\n{code}"

    print(f"\n Generated Code: {code}")
    # 使用平台提供的 code_session 沙箱执行代码
    with code_session(config.region, "your-interpreter-name") as code_client:
        response = code_client.invoke(
            operate_type="execute_code",
            arguments={
                "code": code,
                "language": "python",
                "clearContext": False  # 保持执行上下文,允许跨调用共享变量
            }
        )
        print(response)
    return json.dumps(response["result"])
@tool
def read_file(file_path: str) -> str:
    """
    读取文件内容

    Args:
        file_path: 文件路径(绝对路径或相对路径)

    Returns:
        文件内容(最大1MB)
    """
    try:
        path = Path(file_path)
        if not path.exists():
            return f"错误: 文件不存在 - {file_path}"
        if path.stat().st_size > 1024 * 1024:
            return f"错误: 文件过大 (最大支持1MB) - {file_path}"
        return path.read_text(encoding='utf-8')
    except Exception as e:
        return f"错误: 读取文件失败 - {str(e)}"


@tool
def write_file(file_path: str, content: str) -> str:
    """
    写入内容到文件(覆盖模式)

    Args:
        file_path: 文件路径
        content: 要写入的内容

    Returns:
        操作结果
    """
    try:
        path = Path(file_path)
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content, encoding='utf-8')
        return f"成功写入文件: {file_path}"
    except Exception as e:
        return f"错误: 写入文件失败 - {str(e)}"


@tool
def append_file(file_path: str, content: str) -> str:
    """
    追加内容到文件末尾

    Args:
        file_path: 文件路径
        content: 要追加的内容

    Returns:
        操作结果
    """
    try:
        path = Path(file_path)
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, 'a', encoding='utf-8') as f:
            f.write(content)
        return f"成功追加内容到文件: {file_path}"
    except Exception as e:
        return f"错误: 追加文件失败 - {str(e)}"


@tool
def list_directory(dir_path: str = ".") -> str:
    """
    列出目录内容

    Args:
        dir_path: 目录路径(默认当前目录)

    Returns:
        目录内容列表
    """
    try:
        path = Path(dir_path)
        if not path.exists():
            return f"错误: 目录不存在 - {dir_path}"
        if not path.is_dir():
            return f"错误: 不是目录 - {dir_path}"

        items = []
        for item in sorted(path.iterdir()):
            item_type = "DIR" if item.is_dir() else "FILE"
            size = item.stat().st_size if item.is_file() else 0
            items.append(f"{item_type:6} | {size:>10} | {item.name}")
        return "类型      | 大小      | 名称\n" + "\n".join(items)
    except Exception as e:
        return f"错误: 列出目录失败 - {str(e)}"


@tool
def fetch_url(url: str, method: str = "GET", headers: Optional[Dict] = None, body: Optional[str] = None) -> str:
    """
    访问 URL 获取内容

    Args:
        url: 目标URL
        method: HTTP方法(GET/POST)
        headers: 自定义请求头
        body: 请求体(用于POST)

    Returns:
        响应内容
    """
    try:
        default_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
        if headers:
            default_headers.update(headers)
        request_kwargs = {"url": url, "headers": default_headers, "timeout": 30}
        if method.upper() == "POST":
            request_kwargs["data"] = body.encode() if body else None
        elif method.upper() != "GET":
            return f"错误: 不支持的HTTP方法 - {method}"

        response = requests.request(method, **request_kwargs)
        if len(response.content) > 1024 * 1024:
            return f"响应内容过大 ({len(response.content)} bytes),已截断\n\n" + response.text[:10000]
        return f"状态码: {response.status_code}\n\n{response.text}"
    except Exception as e:
        return f"错误: 访问URL失败 - {str(e)}"


@tool
@require_access_token(
    provider_name="your_provider",  # AgentArts 平台注册的 GitHub 身份提供商
    scopes=["user:email"],              # 请求的 OAuth2 权限范围
    auth_flow="USER_FEDERATION",         # 用户联邦认证流程
ignore_ssl_verification=True,
)
def github_get(url: str, headers: Optional[Dict] = None, access_token: Optional[str] = None) -> str:
    """
    访问 GitHub API(平台级身份认证)

    本函数使用 AgentArts 平台的 require_access_token 装饰器实现自动身份认证,
    无需手动管理 GitHub 访问令牌。

    平台能力 vs 传统方式对比:
    ┌─────────────────────┬──────────────────────────────┬──────────────────────────────┐
    │ 特性                │ 传统方式                      │ AgentArts + require_access_ │
    │                     │                              │ token                       │
    ├─────────────────────┼──────────────────────────────┼──────────────────────────────┤
    │ 令牌管理            │ 手动创建、存储、轮换          │ 平台自动管理                 │
    │ 令牌获取            │ 用户自行配置 env 或配置       │ 装饰器自动注入               │
    │ 令牌刷新            │ 需要编写刷新逻辑              │ 平台自动处理                 │
    │ 安全性              │ 令牌暴露在代码或环境变量      │ 令牌存储在平台安全存储       │
    │ 用户体验            │ 配置复杂,容易出错            │ 声明式配置,开箱即用         │
    └─────────────────────┴──────────────────────────────┴──────────────────────────────┘

    认证流程说明:
        1. 用户首次调用时,平台自动发起 OAuth2 授权流程
        2. 用户在 GitHub 授权页面完成授权
        3. 平台获取并安全存储访问令牌
        4. 后续调用时,access_token 自动注入到函数参数
        5. 令牌过期前,平台自动刷新

    Args:
        url: GitHub API 路径(如 /repos/owner/repo/issues)
        headers: 自定义请求头
        access_token: [平台自动注入] OAuth2 访问令牌

    Returns:
        API 响应内容
    """
    # 记录 token 状态
    has_token = bool(access_token)
    print(f"[GITHUB] access_token: {'已注入' if has_token else '未注入'}")
    base_url = "https://api.github.com"

    default_headers = {
        "User-Agent": "LangGraph-Agent",
        "Accept": "application/vnd.github.v3+json",
    }
    if access_token:
        default_headers["Authorization"] = f"token {access_token}"
    if headers:
        default_headers.update(headers)
    # 获取代理配置
    proxies = {}
    http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
    https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
    if http_proxy:
        proxies["http"] = http_proxy
    if https_proxy:
        proxies["https"] = https_proxy
    try:
        full_url = base_url + url if url.startswith("/") else url
        # 禁用 SSL 证书验证(仅用于测试)
        response = requests.get(full_url, headers=default_headers, timeout=30, proxies=proxies if proxies else None, verify=False)
        # 记录响应状态
        print(f"[GITHUB] 响应状态: {response.status_code}")
        if len(response.content) > 1024 * 1024:
            return f"响应内容过大,已截断\n\n{response.text[:10000]}"

        return f"状态码: {response.status_code}\n\n{response.text}"
    except Exception as e:
        print(f"[GITHUB] 请求失败: {str(e)}")
        return f"错误: GitHub API请求失败 - {str(e)}"


@tool
def github_get_file(repo: str, file_path: str, ref: str = "main") -> str:
    """
    获取 GitHub 仓库中的文件内容

    Args:
        repo: 仓库名称(如 "owner/repo")
        file_path: 文件路径(如 "src/main.py")
        ref: 分支或标签名(默认 "main")

    Returns:
        文件内容
    """
    import base64
    url = f"/repos/{repo}/contents/{file_path}?ref={ref}"
    result = github_get.invoke(url)
    if "状态码: 200" in result:
        try:
            json_start = result.find("{")
            json_end = result.rfind("}") + 1
            if json_start >= 0 and json_end > json_start:
                data = json.loads(result[json_start:json_end])
                content = data.get("content", "")
                if content:
                    decoded = base64.b64decode(content).decode("utf-8")
                    return f"文件: {file_path}\n分支: {ref}\n\n{decoded}"
        except Exception as e:
            return f"错误: 解析文件内容失败 - {str(e)}\n\n原始响应:\n{result}"

    return result
@tool
def github_search(query: str, search_type: str = "repositories") -> str:
    """
    搜索 GitHub

    Args:
        query: 搜索关键词
        search_type: 搜索类型(repositories/code/issues/commits)

    Returns:
        搜索结果
    """
    url = f"/search/{search_type}?q={requests.utils.quote(query)}"
    return github_get.invoke(url)
@tool
def github_get_issues(repo: str, state: str = "open", per_page: int = 30) -> str:
    """
    获取 GitHub 仓库的 Issue 列表

    Args:
        repo: 仓库名称(如 "langchain-ai/langgraph")
        state: Issue 状态(open/closed/all)
        per_page: 返回数量(默认30)

    Returns:
        Issue 列表(JSON 格式)
    """
    url = f"/repos/{repo}/issues?state={state}&per_page={per_page}"
    return github_get.invoke(url)
@tool
def github_get_issue_comments(repo: str, issue_number: int) -> str:
    """
    获取 GitHub 仓库某个 Issue 的评论

    Args:
        repo: 仓库名称(如 "langchain-ai/langgraph")
        issue_number: Issue 编号

    Returns:
        评论列表(JSON 格式)
    """
    url = f"/repos/{repo}/issues/{issue_number}/comments"
    return github_get.invoke(url)
@tool
def github_get_pull_requests(repo: str, state: str = "open", per_page: int = 30) -> str:
    """
    获取 GitHub 仓库的 Pull Request 列表

    Args:
        repo: 仓库名称(如 "langchain-ai/langgraph")
        state: PR 状态(open/closed/merged/all)
        per_page: 返回数量(默认30)

    Returns:
        PR 列表(JSON 格式)
    """
    url = f"/repos/{repo}/pulls?state={state}&per_page={per_page}"
    return github_get.invoke(url)
def get_all_tools() -> List:
    """
    获取所有可用工具

    Returns:
        工具列表
    """
    tools = [
        execute_python,
        read_file,
        write_file,
        append_file,
        list_directory,
        fetch_url,
        github_get,
        github_get_file,
        github_search,
        github_get_issues,
        github_get_issue_comments,
        github_get_pull_requests,
    ]
    return tools

class AgentState(TypedDict):
    """Agent 状态类型定义"""
    messages: List
    iteration: int
def should_continue(state: AgentState) -> str:
    """
    判断是否继续执行(条件边函数)

    如果 LLM 返回了工具调用,则继续执行工具节点

    Args:
        state: 当前 Agent 状态

    Returns:
        "continue": 继续执行工具
        "end": 结束执行
    """
    last_message = state["messages"][-1]
    has_tool_calls = hasattr(last_message, "tool_calls") and last_message.tool_calls
    return "continue" if has_tool_calls else "end"


def model_node(state: AgentState) -> AgentState:
    """
    模型节点:调用 LLM 生成回复

    将工具绑定到 LLM,让模型能够决定是否需要调用工具

    Args:
        state: 当前 Agent 状态

    Returns:
        更新后的状态(包含 LLM 回复)
    """
    tools = get_all_tools()
    llm_with_tools = llm_client.bind_tools(tools)
    response = llm_with_tools.invoke(state["messages"])
    # ====== 工具调用日志 ======
    has_tool_calls = hasattr(response, 'tool_calls') and response.tool_calls
    if has_tool_calls:
        for tc in response.tool_calls:
            print(f"[MODEL] 调用工具: {tc.get('name', 'unknown')}")
    # ====== 日志结束 ======

    new_messages = state["messages"] + [response]
    iteration = state.get("iteration", 0) + 1

    return {
        "messages": new_messages,
        "iteration": iteration,
    }
def create_agent_graph(checkpointer=None):
    """
    创建 Agent 工作流图

    使用 LangGraph 的 StateGraph 构建工作流:
    1. 添加 agent 节点(调用 LLM)
    2. 添加 tools 节点(执行工具)
    3. 设置条件边(根据是否有工具调用决定走向)
    4. 编译图并绑定 checkpointer

    Args:
        checkpointer: 状态持久化器

    Returns:
        编译后的 LangGraph 图
    """
    tools = get_all_tools()
    # 自定义 ToolNode - 合并工具返回的消息和原有消息
    def tool_node_with_log(state: AgentState) -> AgentState:
        """工具节点:执行工具并合并消息"""
        original_messages = list(state["messages"])
        result = ToolNode(tools).invoke(state)
        # 合并原有消息和工具返回的 ToolMessage
        tool_messages = result["messages"]
        merged_messages = original_messages + tool_messages
        return {
            "messages": merged_messages,
            "iteration": state.get("iteration", 0),
        }
    workflow = StateGraph(AgentState)
    # 添加节点
    workflow.add_node("agent", model_node)
    workflow.add_node("tools", tool_node_with_log)
    # 设置入口点
    workflow.set_entry_point("agent")
    # 添加条件边:检查是否需要调用工具
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {"continue": "tools", "end": END}
    )
    # 工具执行后返回 agent 节点
    workflow.add_edge("tools", "agent")
    # 编译图(传入checkpointer)
    return workflow.compile(checkpointer=checkpointer)
# ================================================================================
# 第六部分:Agent 主类
# ================================================================================

class LangGraphAgent:
    """
    LangGraph Agent 主类

    对外提供的核心类,封装了:
    - 工作流图的创建和执行
    - 会话状态管理
    - 对话历史的存取

    使用示例:
        agent = LangGraphAgent()
        response = agent.run("你好", session_id="user-001")
        history = agent.get_history(session_id="user-001")
    """

    def __init__(self, system_prompt: Optional[str] = None, checkpointer=None):
        """
        初始化 Agent

        Args:
            system_prompt: 系统提示词(可选,默认值见 _default_system_prompt)
            checkpointer: 自定义 Checkpointer(可选,默认使用全局单例)
        """
        self._checkpointer = checkpointer or get_checkpointer()
        self.graph = create_agent_graph(checkpointer=self._checkpointer)
        self.system_prompt = system_prompt or self._default_system_prompt()
        self._last_session_id: Optional[str] = None

    def _default_system_prompt(self) -> str:
        """默认系统提示词"""
        return """你是一个智能助手,可以通过调用工具来帮助用户完成任务。

可用的工具:
- execute_python: 执行Python代码
- read_file: 读取本地文件内容
- write_file: 写入本地文件
- append_file: 追加内容到本地文件
- list_directory: 列出本地目录内容
- fetch_url: 访问URL获取网页内容
- github_get: 访问GitHub API
- github_search: 搜索GitHub仓库/代码/issues
- github_get_issues: 获取仓库的Issue列表
- github_get_issue_comments: 获取Issue的评论
- github_get_pull_requests: 获取仓库的PR列表
- github_get_file: 获取仓库中的文件内容

当用户询问GitHub相关问题时(如查看仓库、Issue、PR等),请使用GitHub工具来获取信息。
GitHub仓库是线上的(如 langchain-ai/langgraph),不是本地目录。
"""

    def _get_config(self, session_id: str) -> Dict[str, Any]:
        """获取运行时配置"""
        return get_thread_config(session_id)
    def run(self, user_input: str, session_id: Optional[str] = None) -> str:
        """
        运行 Agent 处理用户输入

        这是主要入口方法,每次调用都会:
        1. 从 Checkpoint 恢复会话状态(如有)
        2. 添加用户消息
        3. 执行工作流
        4. 自动保存状态到 Checkpoint

        Args:
            user_input: 用户输入的文本
            session_id: 会话ID(可选)
                - 不传:使用上次会话ID或自动生成新ID
                - 传值:使用指定会话ID,实现多会话隔离

        Returns:
            Agent 的回复文本
        """
        # 确定会话ID
        if session_id is None:
            session_id = self._last_session_id or str(uuid.uuid4())
        # 创建会话(如果已存在则忽略,继续执行)
        memory_client = MemoryClient()
        try:
            memory_client.create_memory_session(space_id=config.memory_space, id=session_id)
        except Exception as e:
            # Session already exists, continue with existing session
            if "Session id already exists" in str(e):
                pass
            else:
                raise

        self._last_session_id = session_id
        thread_config = self._get_config(session_id)
        # 从 checkpointer 获取历史消息
        checkpoint = self._checkpointer.get(thread_config)
        if checkpoint and "channel_values" in checkpoint:
            history_messages = list(checkpoint["channel_values"].get("messages", []))
        else:
            history_messages = []
        # 如果有历史消息,添加到现有消息后面(避免重复添加用户消息)
        if history_messages:
            # 检查最后一条是否是相同的用户消息
            if not (history_messages and isinstance(history_messages[-1], HumanMessage) and
                    history_messages[-1].content == user_input):
                messages = history_messages + [HumanMessage(content=user_input)]
            else:
                messages = history_messages
        else:
            # 首次对话,添加系统消息
            messages = [
                SystemMessage(content=self.system_prompt),
                HumanMessage(content=user_input)
            ]
        # 执行工作流
        result = self.graph.invoke({"messages": messages}, config=thread_config)
        # 返回最后一条消息的内容
        last_message = result["messages"][-1]
        return last_message.content if hasattr(last_message, "content") else str(last_message)
    @property
    def session_id(self) -> Optional[str]:
        """获取当前会话ID"""
        return self._last_session_id

       

相关文档