Loading...
正在加载...
请稍候

Alias 项目架构与设计分析报告

✨步子哥 (steper) 2025年12月26日 09:54
## 目录 1. [项目概述](#项目概述) 2. [整体架构设计](#整体架构设计) 3. [智能体架构与设计模式](#智能体架构与设计模式) 4. [多模式运行系统](#多模式运行系统) 5. [元规划器(Meta Planner)设计](#元规划器meta-planner设计) 6. [深度研究(Deep Research)架构](#深度研究deep-research架构) 7. [数据科学智能体设计](#数据科学智能体设计) 8. [工具系统架构](#工具系统架构) 9. [记忆系统设计](#记忆系统设计) 10. [前后端架构与通信机制](#前后端架构与通信机制) 11. [沙箱执行环境](#沙箱执行环境) 12. [状态管理与持久化](#状态管理与持久化) 13. [扩展点与Hook机制](#扩展点与hook机制) 14. [设计模式与最佳实践](#设计模式与最佳实践) 15. [技术栈与依赖分析](#技术栈与依赖分析) --- ## 项目概述 ### 项目定位 Alias 是一个基于 AgentScope 框架构建的多模式智能体系统,旨在提供高度可扩展、模块化的 AI 智能体解决方案。该系统通过多种专业化智能体模式,支持从通用对话到深度研究、数据分析、浏览器自动化等复杂任务。 ### 核心特性 1. **多模式智能体架构**: 支持通用模式、浏览器使用、深度研究、金融分析、数据科学等多种专业化模式 2. **元规划系统**: 通过 MetaPlanner 实现复杂任务的自动分解和协调 3. **工具生态系统**: 基于 AliasToolkit 的模块化工具管理,支持动态工具注册和共享 4. **长期记忆系统**: 集成工具记忆和用户画像功能 5. **前后端分离**: 基于 FastAPI 的后端服务和 React 的前端界面 6. **沙箱执行环境**: 安全的代码执行和工具调用环境 7. **状态持久化**: 支持智能体执行状态的保存和恢复 ### 项目结构 ``` alias/ ├── frontend/ # 前端应用 (React + TypeScript) │ ├── src/ │ └── package.json ├── src/alias/ │ ├── agent/ # 智能体核心实现 │ │ ├── agents/ # 各类智能体实现 │ │ │ ├── _alias_agent_base.py │ │ │ ├── _meta_planner.py │ │ │ ├── _deep_research_agent_v2.py │ │ │ ├── _data_science_agent.py │ │ │ ├── _browser_use_agent.py │ │ │ ├── _finance_agent.py │ │ │ ├── _react_worker.py │ │ │ ├── meta_planner_utils/ │ │ │ ├── dr_agent_utils/ │ │ │ ├── ds_agent_utils/ │ │ │ └── common_agent_utils/ │ │ ├── tools/ # 工具系统 │ │ │ ├── alias_toolkit.py │ │ │ ├── improved_tools/ │ │ │ └── toolkit_hooks/ │ │ ├── memory/ # 记忆系统 │ │ │ ├── longterm_memory.py │ │ │ └── longterm_memory_utils.py │ │ └── run.py # 智能体运行入口 │ ├── server/ # 后端服务 (FastAPI) │ │ ├── api/ │ │ ├── core/ │ │ ├── db/ │ │ ├── middleware/ │ │ └── main.py │ ├── runtime/ # 运行时环境 │ │ └── alias_sandbox/ │ ├── memory_service/ # 记忆服务 │ └── utils/ └── pyproject.toml ``` --- ## 整体架构设计 ### 分层架构 Alias 项目采用清晰的分层架构设计,从下到上依次为: ``` ┌─────────────────────────────────────────────────────────────┐ │ 前端展示层 (Frontend) │ │ React + TypeScript + Ant Design │ └─────────────────────────────────────────────────────────────┘ ↕ HTTP/WebSocket ┌─────────────────────────────────────────────────────────────┐ │ API 网关层 (Server) │ │ FastAPI + 中间件 │ └─────────────────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────────────────┐ │ 业务逻辑层 (Agent) │ │ MetaPlanner | DeepResearch | DataScience │ └─────────────────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────────────────┐ │ 核心抽象层 (Base) │ │ AliasAgentBase | ReActAgent │ └─────────────────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────────────────┐ │ 基础设施层 (Infrastructure) │ │ Toolkit | Memory | Sandbox | StateModule │ └─────────────────────────────────────────────────────────────┘ ``` ### 架构设计原则 1. **单一职责原则**: 每个模块专注于特定的功能领域 2. **开闭原则**: 通过继承和组合实现扩展,避免修改核心代码 3. **依赖倒置原则**: 依赖抽象接口而非具体实现 4. **接口隔离原则**: 细粒度的接口设计,避免不必要的依赖 5. **组合优于继承**: 通过工具组合实现功能复用 ### 核心设计模式 1. **策略模式**: 不同智能体模式采用不同的执行策略 2. **观察者模式**: Hook 机制实现事件驱动的扩展点 3. **工厂模式**: WorkerManager 动态创建不同类型的 Worker 4. **模板方法模式**: AliasAgentBase 定义执行流程模板 5. **建造者模式**: 深度研究树节点的构建过程 6. **责任链模式**: 工具调用的后处理钩子链 --- ## 智能体架构与设计模式 ### AliasAgentBase: 智能体基类 `AliasAgentBase` 是所有 Alias 智能体的核心基类,继承自 AgentScope 的 `ReActAgent`,提供了扩展的基础设施和自定义能力。 #### 核心职责 1. **生命周期管理**: 管理智能体的初始化、执行和销毁 2. **工具管理**: 集成 AliasToolkit 进行工具注册和调用 3. **记忆管理**: 集成长期记忆系统 4. **状态管理**: 支持状态的保存和恢复 5. **Hook 机制**: 提供扩展点用于自定义行为 6. **会话管理**: 与 SessionService 集成进行会话状态跟踪 #### 关键初始化参数 ```python class AliasAgentBase(ReActAgent): def __init__( self, name: str, # 智能体名称 model: ChatModelBase, # 语言模型 formatter: FormatterBase, # 消息格式化器 memory: MemoryBase, # 记忆系统 toolkit: AliasToolkit, # 工具包 session_service: Any, # 会话服务 state_saving_dir: Optional[str] = None, # 状态保存目录 sys_prompt: Optional[str] = None, # 系统提示词 max_iters: int = 10, # 最大迭代次数 tool_call_interrupt_return: bool = True, # 工具调用中断返回 long_term_memory: Optional[LongTermMemoryBase] = None, # 长期记忆 long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both", ) ``` #### Hook 机制 Hook 机制是 AliasAgentBase 的核心扩展点,允许在智能体执行的关键阶段插入自定义逻辑: ```python # Hook 类型 - pre_reply: 回复前的处理 - post_reasoning: 推理后的处理 - post_acting: 动作执行后的处理 - pre_acting: 动作执行前的处理 ``` **Hook 注册示例**: ```python self.register_instance_hook( "pre_reply", "agent_load_states_pre_reply_hook", agent_load_states_pre_reply_hook, ) ``` #### 状态持久化 通过 `register_state` 方法注册需要持久化的状态: ```python self.register_state( "deep_research_tree", custom_to_json=lambda x: x.state_dict() if x else None, custom_from_json=lambda x: DeepResearchTreeNode.reconstruct_from_state_dict(x, x.get("task_type", "general")), ) ``` ### ReActWorker: 工作智能体 `ReActWorker` 是专门用于执行子任务的轻量级智能体,由 MetaPlanner 动态创建和管理。 #### 设计特点 1. **轻量级**: 只包含执行子任务所需的最小功能集 2. **工具隔离**: 每个Worker拥有独立的工具集,避免工具冲突 3. **状态独立**: 拥有独立的记忆和状态空间 4. **可重建**: 支持从状态字典重建,便于持久化和恢复 #### Worker 重建机制 ```python def rebuild_reactworker( worker_info: WorkerInfo, old_toolkit: AliasToolkit, new_toolkit: AliasToolkit, memory: Optional[MemoryBase] = None, model: Optional[ChatModelBase] = None, formatter: Optional[FormatterBase] = None, exclude_tools: Optional[list[str]] = None, ) -> ReActWorker: """重建 ReActAgent Worker""" # 工具共享 tool_list = [ tool_name for tool_name in worker_info.tool_lists if tool_name not in exclude_tools ] share_tools(old_toolkit, new_toolkit, tool_list) # 创建新的 Worker return ReActWorker( name=worker_info.worker_name, sys_prompt=worker_info.sys_prompt, model=model, formatter=formatter, toolkit=new_toolkit, memory=InMemoryMemory() if memory is None else memory, max_iters=WORKER_MAX_ITER, ) ``` ### 智能体类型体系 ``` AliasAgentBase (抽象基类) ├── MetaPlanner (元规划器) ├── DeepResearchAgent (深度研究) ├── DataScienceAgent (数据科学) ├── BrowserUseAgent (浏览器使用) └── FinanceAgent (金融分析) ReActWorker (工作智能体) └── 由 MetaPlanner 动态创建 ``` --- ## 多模式运行系统 ### 模式分类 Alias 系统支持五种主要的智能体运行模式: 1. **general**: 通用对话模式,使用 MetaPlanner 进行任务规划 2. **browser**: 浏览器自动化模式,使用 BrowserUseAgent 3. **dr**: 深度研究模式,使用 DeepResearchAgent 4. **ds**: 数据科学模式,使用 DataScienceAgent 5. **finance**: 金融分析模式,使用 FinanceAgent ### 模式路由机制 模式路由在 `run.py` 中实现,根据会话的 `chat_mode` 选择对应的智能体: ```python async def arun_agents( session_service: SessionService, sandbox: Sandbox = None, ): """智能体执行入口点""" chat_mode = session_service.session_entity.chat_mode if chat_mode == "dr": await arun_deepresearch_agent(session_service, sandbox) elif chat_mode == "browser": await arun_browseruse_agent(session_service, sandbox) elif chat_mode == "ds": await arun_datascience_agent(session_service, sandbox) elif chat_mode == "finance": await arun_finance_agent(session_service, sandbox) else: if chat_mode != "general": logger.warning(f"Unknown chat mode: {chat_mode}. Invoke general mode instead.") await arun_meta_planner(session_service, sandbox) ``` ### 模式切换策略 1. **显式切换**: 用户在前端界面选择模式 2. **自动识别**: 系统根据任务特征自动推荐模式 3. **动态调整**: MetaPlanner 可以在执行过程中调用不同模式的 Worker --- ## 元规划器(Meta Planner)设计 ### 核心概念 MetaPlanner 是 Alias 系统的核心协调组件,负责将复杂任务分解为可执行的子任务,并协调多个 Worker 完成整体任务。 ### 架构组件 MetaPlanner 由三个核心组件构成: ``` MetaPlanner ├── PlannerNoteBook (规划笔记本) │ ├── Roadmap (路线图) │ ├── SubTaskStatus (子任务状态) │ └── WorkerInfo (Worker 信息) ├── RoadmapManager (路线图管理器) │ ├── 任务分解 │ ├── 路线图创建 │ └── 路线图修订 └── WorkerManager (Worker 管理器) ├── Worker 创建 ├── Worker 选择 └── Worker 执行协调 ``` ### PlannerNoteBook: 规划状态管理 PlannerNoteBook 是一个状态容器,用于存储整个规划过程的所有信息: ```python class PlannerNoteBook(StateModule): """规划笔记本,存储规划过程中的所有状态""" def __init__(self): self.roadmap: Roadmap = Roadmap() self.detail_analysis_for_plan: str = "" self.file_info: dict[str, FileInfo] = {} self.user_input: list[str] = [] self.worker_pool: dict[str, tuple[WorkerInfo, ReActWorker]] = {} ``` #### Roadmap: 任务路线图 ```python class Roadmap(BaseModel): """任务路线图""" original_task: str = "" # 原始任务 decomposed_tasks: list[SubTaskStatus] = [] # 分解的子任务 def next_unfinished_subtask(self) -> tuple[Optional[int], Optional[SubTaskStatus]]: """获取下一个未完成的子任务""" for idx, subtask in enumerate(self.decomposed_tasks): if subtask.status in ["todo", "in_progress"]: return idx, subtask return None, None ``` #### SubTaskSpecification: 子任务规范 ```python class SubTaskSpecification(BaseModel): """子任务规范""" subtask_name: str # 子任务名称 exact_input: str # 精确输入描述 expected_output: str # 期望输出描述 required_tools: list[str] # 需要的工具列表 worker_type: Literal[ # Worker 类型 "general", "browser", "ds", "finance", "deep_research", ] = "general" worker_sys_prompt: str = "" # Worker 系统提示词 ``` ### RoadmapManager: 路线图管理 RoadmapManager 负责任务分解和路线图的生命周期管理: #### 核心方法 1. **decompose_task_and_build_roadmap**: 任务分解和路线图创建 ```python async def decompose_task_and_build_roadmap( self, user_latest_input: str, given_task_conclusion: str, detail_analysis_for_plan: str, decomposed_subtasks: list[SubTaskSpecification], ) -> ToolResponse: """ 1) 分析用户任务 2) 推理完成整个任务所需的步骤 3) 将这些步骤分组为几个可管理的子任务,满足: - 同一子任务的步骤使用相同的工具集 - 同一子任务的步骤不依赖于后续子任务/步骤 - 每个子任务的目标应该清晰且可验证 - 同一目标的推理/分析和生成/动作应该在同一个子任务中 """ self.planner_notebook.detail_analysis_for_plan = detail_analysis_for_plan self.planner_notebook.roadmap.original_task = given_task_conclusion for subtask in decomposed_subtasks: subtask_status = SubTaskStatus(subtask_specification=subtask) self.planner_notebook.roadmap.decomposed_tasks.append(subtask_status) return ToolResponse( metadata={"success": True}, content=[TextBlock(text="Successfully decomposed the task into subtasks.")], ) ``` 2. **revise_roadmap**: 路线图修订 ```python async def revise_roadmap( self, action: Literal["add_subtask", "revise_subtask", "remove_subtask"], subtask_idx: int, subtask_specification: Optional[SubTaskSpecification] = None, update_to_subtask: Optional[Update] = None, new_state: Literal["todo", "in_progress", "done", "abandoned"] = "in_progress", ) -> ToolResponse: """根据 Worker 执行报告修订路线图""" # 根据不同的 action 执行相应的修订操作 ``` ### WorkerManager: Worker 管理与协调 WorkerManager 负责 Worker 的创建、选择和执行协调: #### 核心职责 1. **动态 Worker 创建**: 根据任务需求创建不同类型的 Worker 2. **Worker 选择**: 为子任务选择最合适的 Worker 3. **执行协调**: 协调 Worker 的执行顺序和依赖关系 4. **结果处理**: 处理 Worker 的执行结果并更新路线图 #### Worker 创建流程 ```python class WorkerManager(StateModule): def __init__( self, worker_model: ChatModelBase, worker_formatter: FormatterBase, planner_notebook: PlannerNoteBook, worker_full_toolkit: AliasToolkit, agent_working_dir: str, sandbox: AliasSandbox, worker_pool: Optional[dict[str, tuple[WorkerInfo, ReActWorker]]] = None, session_service: Any = None, long_term_memory: Optional[LongTermMemoryBase] = None, ): self.worker_model = worker_model self.worker_formatter = worker_formatter self.planner_notebook = planner_notebook self.worker_full_toolkit = worker_full_toolkit self.agent_working_dir = agent_working_dir self.sandbox = sandbox self.worker_pool = worker_pool or {} self.session_service = session_service self.long_term_memory = long_term_memory ``` #### Worker 执行流程 ``` 1. 获取下一个未完成的子任务 2. 根据子任务规范选择或创建 Worker 3. 配置 Worker 的工具集 4. 执行 Worker 5. 处理 Worker 响应 6. 更新路线图状态 7. 重复直到所有子任务完成 ``` ### MetaPlanner 执行流程 ``` 用户输入 ↓ [澄清阶段] (可选) ↓ [任务分解] → 创建路线图 ↓ [用户确认] ↓ [子任务执行循环] ↓ 获取下一个子任务 → 选择/创建 Worker → 执行 Worker → 处理结果 → 更新路线图 ↓ [所有子任务完成] ↓ 生成最终报告 ``` --- ## 深度研究(Deep Research)架构 ### 核心概念 DeepResearchAgent 是专门用于深度研究和信息收集的智能体,采用树状结构组织研究任务,支持递归分解和假设驱动的研究方法。 ### 架构设计 ``` DeepResearchAgent ├── DeepResearchTreeNode (研究树节点) │ ├── DRTaskBase (研究任务基类) │ │ ├── BasicTask (基础任务) │ │ └── HypothesisDrivenTask (假设驱动任务) │ ├── Worker (执行节点任务的 Worker) │ └── ChildrenNodes (子节点) ├── DeepResearchTree (研究树) └── ReportGenerator (报告生成器) ``` ### DeepResearchTreeNode: 研究树节点 每个节点代表一个研究任务,可以递归分解为子任务: ```python class DeepResearchTreeNode(StateModule): def __init__( self, task_type: Literal["general", "finance"], current_executable: Optional[DRTaskBase] = None, level: int = 0, max_depth: int = 1, worker_builder_type: str = "default", parent_executable: Optional["DRTaskBase"] = None, report_dir: str = "/workspace", pre_execute_hook: Union[Callable, Coroutine, None] = None, ): self.task_type = task_type self.level = level self.worker_builder = get_deep_research_worker_builder( worker_builder_type if task_type == "general" else task_type, ) self.worker = None self.current_executable: DRTaskBase = current_executable self.max_depth = max_depth self.parent_executable: DRTaskBase = parent_executable self.children_nodes: list[DeepResearchTreeNode] = [] self.node_execution_result = {} self.report_dir = report_dir self.node_report_path = "" self.node_report = "" self.pre_execute_hook = pre_execute_hook ``` ### 研究树遍历策略 ```python def _get_next_executables(self) -> list[DeepResearchTreeNode]: """获取下一个可执行的节点""" if self.deep_research_tree is None: return [] ready_nodes: list[DeepResearchTreeNode] = [] stack: list[DeepResearchTreeNode] = [self.deep_research_tree] parent_ready_states: set[str] = {"done", "abandoned"} while stack: node = stack.pop() parent_is_ready = ( node.parent_executable is None or node.parent_executable.state in parent_ready_states ) if ( node.current_executable.state in ["todo", "in_progress"] and parent_is_ready and node.level < self.max_depth ): ready_nodes.append(node) stack.extend(reversed(node.children_nodes)) return ready_nodes ``` ### 假设驱动研究 对于金融等需要假设验证的领域,DeepResearchAgent 支持假设驱动的研究方法: ```python async def _generate_hypothesis(self, node: DeepResearchTreeNode): """为 HypothesisDrivenTask 生成初始假设""" sys_prompt = PROMPT_INITIALIZE_HYPOTHESES.format( current_date=_get_timestamp(), ) instruction_msg = Msg("system", content=[TextBlock(type="text", text=sys_prompt)], role="system") user_msg = Msg( "user", content=[ TextBlock( type="text", text=f"Research Question: {node.current_executable.description}\n\nGenerate 2-4 key hypotheses.", ), ], role="user", ) class HypothesesSchema(BaseModel): hypotheses: list[str] = Field(description="List of 2-4 testable hypotheses") try: prompt = await self.formatter.format([instruction_msg, user_msg]) res = await self.model(prompt, structured_model=HypothesesSchema) hypotheses = None if self.model.stream: async for content_chunk in res: if content_chunk.metadata and "hypotheses" in content_chunk.metadata: hypotheses = content_chunk.metadata["hypotheses"] else: if res.metadata and "hypotheses" in res.metadata: hypotheses = res.metadata["hypotheses"] if hypotheses: for hypothesis in hypotheses: hypothesis_task = HypothesisDrivenTask( description=f"Investigate hypothesis:{hypothesis}", evidences=[], parent_executable=node, max_depth=node.max_depth, deep_research_worker_builder=node.worker_builder, level=node.level + 1, ) node.children_nodes.append( DeepResearchTreeNode( task_type="finance", current_executable=hypothesis_task, level=node.level + 1, parent_executable=None, max_depth=self.max_depth, report_dir=self.agent_working_dir, pre_execute_hook=None, ), ) node.current_executable.state = "done" except Exception as e: logger.warning(f"Failed to generate hypotheses: {e}") ``` ### 深度研究执行流程 ``` 用户查询 ↓ [初步信息收集] (可选) ↓ [澄清阶段] (可选) ↓ 创建根节点 ↓ [研究树遍历] ↓ 获取可执行节点 ↓ 执行节点任务 (使用 Worker) ↓ 生成节点报告 ↓ [是否需要分解?] ├── 是 → 生成子节点 → 返回遍历 └── 否 → 继续遍历 ↓ [所有节点完成] ↓ 生成最终报告 ``` --- ## 数据科学智能体设计 ### 核心概念 DataScienceAgent 是专门用于数据分析、可视化和建模的智能体,集成了 Jupyter/IPython 环境和专业的数据科学工具链。 ### 架构特点 1. **场景感知**: 根据任务类型自动选择合适的提示词和工具 2. **任务管理**: 内置 TodoList 管理数据分析任务 3. **代码执行**: 集成 IPython 环境进行代码执行 4. **报告生成**: 自动生成详细的数据分析报告 ### 核心组件 ```python class DataScienceAgent(AliasAgentBase): def __init__( self, name: str, model: ChatModelBase, formatter: FormatterBase, memory: MemoryBase, toolkit: AliasToolkit, sys_prompt: str = None, max_iters: int = 30, tmp_file_storage_dir: str = "/workspace", state_saving_dir: Optional[str] = None, session_service: Any = None, ) -> None: self.think_function_name = "think" self.uploaded_files: List[str] = [] self.todo_list: List[Dict[str, Any]] = [] self.infer_trajectories: List[List[Msg]] = [] self.detailed_report_path = os.path.join(tmp_file_storage_dir, "detailed_report.html") self.tmp_file_storage_dir = tmp_file_storage_dir ``` ### 场景感知系统 DataScienceAgent 使用 LLMPromptSelector 根据任务类型选择合适的提示词: ```python available_prompts = { "explorative_data_analysis": get_prompt_from_file("_scenario_explorative_data_analysis.md"), "data_modeling": get_prompt_from_file("_scenario_data_modeling_prompt.md"), "data_computation": get_prompt_from_file("_scenario_data_computation_prompt.md"), } self.prompt_selector = LLMPromptSelector( self.model, self.formatter, available_prompts, ) ``` ### TodoList 管理 内置 TodoList 用于跟踪数据分析任务: ```python def todo_write( agent: DataScienceAgent, action: Literal["add", "update", "delete", "mark_done"], task_name: str, task_description: str = "", task_status: Literal["todo", "in_progress", "done"] = "todo", ) -> ToolResponse: """管理数据分析任务的 TodoList""" if action == "add": agent.todo_list.append({ "task_name": task_name, "task_description": task_description, "task_status": task_status, }) elif action == "update": for task in agent.todo_list: if task["task_name"] == task_name: task["task_description"] = task_description task["task_status"] = task_status elif action == "delete": agent.todo_list = [t for t in agent.todo_list if t["task_name"] != task_name] elif action == "mark_done": for task in agent.todo_list: if task["task_name"] == task_name: task["task_status"] = "done" return ToolResponse( metadata={"success": True}, content=[TextBlock(text=f"TodoList updated: {json.dumps(agent.todo_list, indent=2)}")], ) ``` ### IPython 环境集成 ```python def install_package(sandbox: AliasSandbox): """安装数据科学所需的 Python 包""" packages = [ "pandas", "numpy", "matplotlib", "seaborn", "scikit-learn", "plotly", ] for package in packages: try: sandbox.call_tool("pip_install", {"package": package}) except Exception as e: logger.warning(f"Failed to install {package}: {e}") def set_run_ipython_cell(sandbox: AliasSandbox): """配置 IPython 单元格执行""" sandbox.call_tool("ipython_setup", {}) ``` ### 动态系统提示词 DataScienceAgent 的系统提示词是动态构建的,包含基础提示词、场景提示词和 TodoList 提示词: ```python @property def sys_prompt(self) -> str: base_prompt = self._sys_prompt todo_prompt = self.todo_list_prompt.replace( "{todoList}", json.dumps(self.todo_list, indent=2, ensure_ascii=False), ) return f"{base_prompt}{self._selected_scenario_prompts}\n\n{todo_prompt}" ``` ### 数据科学执行流程 ``` 用户上传数据文件 ↓ [场景识别] → 选择合适的提示词 ↓ [探索性数据分析] ├── 数据加载 ├── 数据清洗 ├── 统计分析 └── 可视化 ↓ [数据建模] (可选) ├── 特征工程 ├── 模型训练 └── 模型评估 ↓ [数据计算] (可选) ├── 数据转换 ├── 聚合计算 └── 高级分析 ↓ 生成详细报告 ``` --- ## 工具系统架构 ### AliasToolkit: 核心工具包 AliasToolkit 是 Alias 系统的工具管理核心,继承自 AgentScope 的 Toolkit,提供了增强的工具管理能力。 ### 核心功能 1. **工具注册**: 支持函数和 JSON Schema 两种注册方式 2. **工具分组**: 通过 group_name 组织工具 3. **工具共享**: 在不同智能体间共享工具 4. **工具黑名单**: 过滤不需要的工具 5. **后处理钩子**: 对工具输出进行后处理 6. **MCP 集成**: 支持 Model Context Protocol 客户端 ### 初始化 ```python class AliasToolkit(Toolkit): def __init__( self, sandbox: AliasSandbox = None, add_all: bool = False, is_browser_toolkit: bool = False, tool_blacklist: list = TOOL_BLACKLIST, ): super().__init__() self.sandbox = sandbox self.session_id = self.sandbox.sandbox_id if sandbox else None self.categorized_functions = {} self.tool_blacklist = tool_blacklist if add_all and sandbox: tools_schema = self.sandbox.list_tools() for category, function_dicts in tools_schema.items(): if (is_browser_toolkit and category == "playwright") or ( not is_browser_toolkit and category != "playwright" ): for _, function_json in function_dicts.items(): if function_json["name"] not in self.tool_blacklist: self._add_io_function(function_json) # 添加改进的文件操作工具 file_sys = ImprovedFileOperations(sandbox) self.register_tool_function(file_sys.read_file) self.additional_mcp_clients = [] self.long_text_post_hook = LongTextPostHook(sandbox) self._add_tool_postprocessing_func() ``` ### 工具注册 ```python def _add_io_function( self, json_schema: dict, is_browser_tool: bool = False, ) -> None: """添加 IO 函数""" tool_name = json_schema["name"] def wrap_tool_func(name: str) -> Callable: def wrapper(**kwargs) -> ToolResponse: try: result = self.sandbox.call_tool(name=name, arguments=kwargs) if isinstance(result, dict) and "content" in result: content = result["content"] if isinstance(content, list): for i, block in enumerate(content): if isinstance(block, dict) and "annotations" in block: block.pop("annotations") content[i] = block else: content = [TextBlock(type="text", text=str(result))] return ToolResponse( metadata={"success": True, "tool_name": name}, content=content, ) except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}") return ToolResponse( metadata={"success": False, "tool_name": name, "error": str(e)}, content=[TextBlock(type="text", text=f"Error executing tool {name}: {str(e)}")], ) wrapper.__name__ = name return wrapper tool_func = wrap_tool_func(tool_name) self.register_tool_function( tool_func=tool_func, json_schema=json_schema.get("json_schema", {}), ) ``` ### 工具共享 ```python def share_tools( source_toolkit: AliasToolkit, target_toolkit: AliasToolkit, tool_names: list[str], ) -> None: """在工具包之间共享工具""" for tool_name in tool_names: if tool_name in source_toolkit.tools: target_toolkit.tools[tool_name] = source_toolkit.tools[tool_name] ``` ### 工具后处理钩子 ```python def _add_tool_postprocessing_func(self) -> None: """添加工具后处理函数""" long_text_hook = LongTextPostHook(self.sandbox) for tool_func, _ in self.tools.items(): if tool_func.startswith(("read_file", "read_multiple_files")): self.tools[tool_func].postprocess_func = read_file_post_hook if tool_func.startswith("tavily"): self.tools[tool_func].postprocess_func = long_text_hook.truncate_and_save_response ``` ### MCP 客户端集成 ```python async def add_and_connect_mcp_client( self, mcp_client: MCPClientBase, group_name: str = "basic", enable_funcs: list[str] | None = None, disable_funcs: list[str] | None = None, preset_kwargs_mapping: dict[str, dict[str, Any]] | None = None, postprocess_func: Callable[[ToolUseBlock, ToolResponse], ToolResponse | None] | None = None, ): """添加并连接 MCP 客户端""" if isinstance(mcp_client, StatefulClientBase): await mcp_client.connect() self.additional_mcp_clients.append(mcp_client) await self.register_mcp_client( mcp_client, enable_funcs=enable_funcs, group_name=group_name, disable_funcs=disable_funcs, preset_kwargs_mapping=preset_kwargs_mapping, postprocess_func=postprocess_func, ) elif isinstance(mcp_client, HttpStatelessClient): self.additional_mcp_clients.append(mcp_client) await self.register_mcp_client( mcp_client, enable_funcs=enable_funcs, group_name=group_name, disable_funcs=disable_funcs, preset_kwargs_mapping=preset_kwargs_mapping, postprocess_func=postprocess_func, ) ``` ### 工具分类 AliasToolkit 支持按类别组织工具: ``` 工具分类 ├── basic: 基础工具 (文件操作、系统命令等) ├── browser: 浏览器工具 (Playwright) ├── data_science: 数据科学工具 (pandas, numpy, matplotlib) ├── finance: 金融工具 (股票查询、财务分析) ├── deep_research: 深度研究工具 (搜索、爬虫) └── custom: 自定义工具 ``` --- ## 记忆系统设计 ### 记忆系统架构 Alias 的记忆系统采用分层设计,结合短期记忆和长期记忆: ``` 记忆系统 ├── 短期记忆 (InMemoryMemory) │ ├── 会话上下文 │ ├── 工具调用历史 │ └── 推理轨迹 └── 长期记忆 (LongTermMemoryBase) ├── 工具记忆 (Tool Memory) └── 用户画像 (User Profiling) ``` ### 长期记忆模式 AliasAgentBase 支持三种长期记忆模式: ```python long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both" ``` 1. **agent_control**: 智能体自主决定何时访问长期记忆 2. **static_control**: 在特定阶段静态访问长期记忆 3. **both**: 结合两种模式 ### 工具记忆 工具记忆记录工具的使用历史和效果,用于优化工具选择: ```python class ToolMemory: """工具记忆""" def __init__(self): self.tool_usage_history: dict[str, list[ToolUsageRecord]] = {} self.tool_success_rate: dict[str, float] = {} def record_tool_usage( self, tool_name: str, arguments: dict, result: ToolResponse, context: str, ): """记录工具使用""" if tool_name not in self.tool_usage_history: self.tool_usage_history[tool_name] = [] record = ToolUsageRecord( tool_name=tool_name, arguments=arguments, success=result.metadata.get("success", False), timestamp=datetime.now(), context=context, ) self.tool_usage_history[tool_name].append(record) self._update_success_rate(tool_name) def _update_success_rate(self, tool_name: str): """更新工具成功率""" records = self.tool_usage_history.get(tool_name, []) if records: success_count = sum(1 for r in records if r.success) self.tool_success_rate[tool_name] = success_count / len(records) ``` ### 用户画像 用户画像记录用户的偏好、历史行为和特征: ```python class UserProfile: """用户画像""" def __init__(self): self.user_id: str = "" self.preferences: dict[str, Any] = {} self.task_history: list[TaskRecord] = [] self.frequently_used_tools: list[str] = [] self.communication_style: str = "formal" def update_preferences(self, new_preferences: dict[str, Any]): """更新用户偏好""" self.preferences.update(new_preferences) def record_task(self, task: TaskRecord): """记录任务""" self.task_history.append(task) self._update_frequently_used_tools(task.tools_used) def _update_frequently_used_tools(self, tools: list[str]): """更新常用工具""" from collections import Counter all_tools = [t for task in self.task_history for t in task.tools_used] counter = Counter(all_tools) self.frequently_used_tools = [tool for tool, _ in counter.most_common(10)] ``` ### 记忆访问 Hook 通过 Hook 机制在智能体执行的不同阶段访问记忆: ```python def get_user_input_to_mem_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg: """将用户输入添加到记忆""" if msg and msg.role == "user": asyncio.create_task(agent.memory.add(msg)) return msg def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg: """保存推理状态""" if agent.state_saving_dir: state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json") with open(state_path, "w") as f: json.dump({ "timestamp": datetime.now().isoformat(), "reasoning": reasoning_msg.model_dump(), }, f) return reasoning_msg def save_post_action_state(agent: AliasAgentBase, action_msg: Msg) -> Msg: """保存动作状态""" if agent.state_saving_dir: state_path = os.path.join(agent.state_saving_dir, "action_state.json") with open(state_path, "w") as f: json.dump({ "timestamp": datetime.now().isoformat(), "action": action_msg.model_dump(), }, f) return action_msg ``` --- ## 前后端架构与通信机制 ### 后端架构 (FastAPI) 后端基于 FastAPI 构建,提供 RESTful API 和 WebSocket 支持: ```python def create_app(): application = FastAPI( generate_unique_id_function=custom_generate_unique_id, title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json", lifespan=lifespan, ) # 中间件 application.add_middleware(CORSMiddleware, allow_origins=["*"]) application.add_middleware(RequestContextMiddleware) application.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY) # 异常处理 application.add_exception_handler(BaseError, base_exception_handler) # 路由 application.include_router(api_router) return application ``` ### 生命周期管理 ```python @asynccontextmanager async def lifespan(_app: FastAPI): """应用生命周期管理""" # 启动 print("🚀 Starting Alias API Server...") setup_logger() await initialize_database() await task_manager.start() await redis_client.ping() try: await FastAPILimiter.init(redis_client) except Exception as e: print(f"redis init error: {str(e)}") yield # 关闭 await task_manager.stop() await close_database() ``` ### 前端架构 (React + TypeScript) 前端基于 React 和 TypeScript 构建,使用 Ant Design 作为 UI 组件库: ```json { "name": "alias_frontend", "dependencies": { "react": "^18.2.0", "react-dom": "^18.2.0", "antd": "^5.24.8", "@agentscope-ai/chat": "1.1.17", "@agentscope-ai/design": "1.0.11", "axios": "^1.8.4", "socket.io-client": "^4.8.1", "react-router-dom": "^6.22.3", "react-markdown": "^10.1.0", "@codemirror/lang-python": "^6.1.7", "@xterm/xterm": "^5.5.0" } } ``` ### 通信机制 #### REST API ``` POST /api/v1/chat POST /api/v1/session/create POST /api/v1/session/{session_id}/message GET /api/v1/session/{session_id}/history POST /api/v1/agent/execute GET /api/v1/agent/status ``` #### WebSocket 用于实时通信和流式响应: ```typescript const socket = io('http://localhost:8000', { transports: ['websocket'], }); socket.on('connect', () => { console.log('Connected to server'); }); socket.on('message', (data) => { console.log('Received message:', data); }); socket.on('agent_response', (data) => { console.log('Agent response:', data); }); ``` ### 会话管理 SessionService 负责会话状态管理: ```python class SessionService: def __init__(self, session_id: str): self.session_id = session_id self.session_entity = SessionEntity( session_id=session_id, chat_mode="general", created_at=datetime.now(), ) self.message_history: list[Msg] = [] self.agent_state: dict[str, Any] = {} async def add_message(self, msg: Msg): """添加消息到历史""" self.message_history.append(msg) async def get_history(self) -> list[Msg]: """获取消息历史""" return self.message_history async def save_state(self, state: dict[str, Any]): """保存智能体状态""" self.agent_state = state ``` --- ## 沙箱执行环境 ### AliasSandbox: 沙箱核心 AliasSandbox 提供安全的代码执行和工具调用环境: ```python class AliasSandbox: def __init__(self, sandbox_id: str): self.sandbox_id = sandbox_id self.workspace = f"/workspace/{sandbox_id}" self._initialize_environment() def _initialize_environment(self): """初始化沙箱环境""" os.makedirs(self.workspace, exist_ok=True) def list_tools(self) -> dict[str, dict]: """列出可用工具""" return { "file_system": {...}, "code_execution": {...}, "browser": {...}, "data_science": {...}, } def call_tool(self, name: str, arguments: dict) -> ToolResponse: """调用工具""" try: tool_func = self._get_tool_function(name) result = tool_func(**arguments) return ToolResponse( metadata={"success": True, "tool_name": name}, content=[TextBlock(type="text", text=str(result))], ) except Exception as e: return ToolResponse( metadata={"success": False, "tool_name": name, "error": str(e)}, content=[TextBlock(type="text", text=f"Error: {str(e)}")], ) ``` ### 安全特性 1. **隔离执行**: 每个会话使用独立的沙箱环境 2. **资源限制**: 限制 CPU、内存和磁盘使用 3. **网络隔离**: 控制网络访问权限 4. **文件系统隔离**: 独立的工作目录 5. **超时控制**: 工具调用超时机制 ### 改进的文件操作 ```python class ImprovedFileOperations: """改进的文件操作工具""" def __init__(self, sandbox: AliasSandbox): self.sandbox = sandbox def read_file(self, path: str) -> ToolResponse: """读取文件""" try: full_path = os.path.join(self.sandbox.workspace, path) with open(full_path, "r", encoding="utf-8") as f: content = f.read() return ToolResponse( metadata={"success": True}, content=[TextBlock(type="text", text=content)], ) except Exception as e: return ToolResponse( metadata={"success": False, "error": str(e)}, content=[TextBlock(type="text", text=f"Error reading file: {str(e)}")], ) def write_file(self, path: str, content: str) -> ToolResponse: """写入文件""" try: full_path = os.path.join(self.sandbox.workspace, path) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w", encoding="utf-8") as f: f.write(content) return ToolResponse( metadata={"success": True}, content=[TextBlock(type="text", text=f"File written successfully")], ) except Exception as e: return ToolResponse( metadata={"success": False, "error": str(e)}, content=[TextBlock(type="text", text=f"Error writing file: {str(e)}")], ) ``` --- ## 状态管理与持久化 ### StateModule: 状态模块基类 StateModule 提供状态持久化的基础设施: ```python class StateModule: def __init__(self): self._registered_states: dict[str, StateConfig] = {} def register_state( self, name: str, custom_to_json: Optional[Callable] = None, custom_from_json: Optional[Callable] = None, ): """注册状态""" self._registered_states[name] = StateConfig( name=name, custom_to_json=custom_to_json, custom_from_json=custom_from_json, ) def state_dict(self) -> dict[str, Any]: """获取状态字典""" state = {} for name, config in self._registered_states.items(): value = getattr(self, name) if config.custom_to_json: state[name] = config.custom_to_json(value) else: state[name] = value return state def load_state_dict(self, state_dict: dict[str, Any]): """从状态字典加载""" for name, config in self._registered_states.items(): if name in state_dict: if config.custom_from_json: setattr(self, name, config.custom_from_json(state_dict[name])) else: setattr(self, name, state_dict[name]) ``` ### 状态保存与恢复 ```python async def save_agent_state(agent: AliasAgentBase, session_id: str): """保存智能体状态""" if agent.state_saving_dir: state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json") with open(state_path, "w") as f: json.dump(agent.state_dict(), f) async def load_agent_state(agent: AliasAgentBase, session_id: str): """加载智能体状态""" if agent.state_saving_dir: state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json") if os.path.exists(state_path): with open(state_path, "r") as f: state_dict = json.load(f) agent.load_state_dict(state_dict) ``` --- ## 扩展点与Hook机制 ### Hook 类型 AliasAgentBase 提供多个 Hook 扩展点: ```python # Hook 扩展点 - pre_reply: 回复前 - post_reasoning: 推理后 - pre_acting: 动作执行前 - post_acting: 动作执行后 ``` ### Hook 注册 ```python self.register_instance_hook( "pre_reply", "agent_load_states_pre_reply_hook", agent_load_states_pre_reply_hook, ) ``` ### Hook 实现 ```python async def agent_load_states_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg: """加载智能体状态""" if agent.state_saving_dir and agent.session_service: await load_agent_state(agent, agent.session_service.session_id) return msg async def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg: """保存推理状态""" if agent.state_saving_dir: state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json") with open(state_path, "w") as f: json.dump({ "timestamp": datetime.now().isoformat(), "reasoning": reasoning_msg.model_dump(), }, f) return reasoning_msg ``` --- ## 设计模式与最佳实践 ### 设计模式总结 1. **策略模式**: 不同智能体模式采用不同的执行策略 2. **观察者模式**: Hook 机制实现事件驱动的扩展点 3. **工厂模式**: WorkerManager 动态创建不同类型的 Worker 4. **模板方法模式**: AliasAgentBase 定义执行流程模板 5. **建造者模式**: 深度研究树节点的构建过程 6. **责任链模式**: 工具调用的后处理钩子链 7. **单例模式**: SessionService 和某些全局服务 8. **装饰器模式**: Hook 机制本质上是一种装饰器模式 ### 最佳实践 1. **模块化设计**: 每个模块职责单一,高内聚低耦合 2. **接口抽象**: 依赖抽象接口而非具体实现 3. **配置驱动**: 通过配置文件和环境变量控制行为 4. **错误处理**: 完善的异常处理和错误恢复机制 5. **日志记录**: 使用 loguru 进行结构化日志记录 6. **类型注解**: 使用 Python 类型注解提高代码可读性 7. **文档字符串**: 完善的文档字符串和注释 8. **测试覆盖**: 单元测试和集成测试 --- ## 技术栈与依赖分析 ### 后端技术栈 ```python # 核心框架 - agentscope: 智能体框架 - fastapi: Web 框架 - uvicorn: ASGI 服务器 - pydantic: 数据验证 # 数据库 - sqlalchemy: ORM - redis: 缓存和会话存储 # 工具和库 - loguru: 日志记录 - tenacity: 重试机制 - playwright: 浏览器自动化 - pandas: 数据处理 - numpy: 数值计算 - matplotlib: 数据可视化 ``` ### 前端技术栈 ```json { "核心框架": ["react", "react-dom", "typescript"], "UI组件库": ["antd", "@agentscope-ai/design"], "状态管理": ["@agentscope-ai/chat"], "路由": ["react-router-dom"], "HTTP客户端": ["axios"], "实时通信": ["socket.io-client"], "代码编辑器": ["@codemirror/lang-python", "@uiw/react-codemirror"], "终端": ["@xterm/xterm"], "Markdown渲染": ["react-markdown", "remark-gfm"], "样式": ["tailwindcss", "less", "sass"] } ``` ### 依赖关系图 ``` agentscope ├── ReActAgent │ └── AliasAgentBase │ ├── MetaPlanner │ ├── DeepResearchAgent │ ├── DataScienceAgent │ ├── BrowserUseAgent │ └── FinanceAgent ├── Toolkit │ └── AliasToolkit ├── MemoryBase │ ├── InMemoryMemory │ └── LongTermMemoryBase └── StateModule fastapi ├── API Router ├── Middleware └── Exception Handlers react ├── Components ├── Hooks └── Context ``` --- ## 总结 Alias 项目是一个设计精良、架构清晰的多模式智能体系统。通过模块化设计、清晰的分层架构和丰富的扩展点,系统具有高度的可扩展性和可维护性。 ### 核心优势 1. **模块化架构**: 清晰的模块划分,职责明确 2. **多模式支持**: 支持多种专业化智能体模式 3. **元规划能力**: 强大的任务分解和协调能力 4. **工具生态系统**: 丰富的工具和灵活的工具管理 5. **记忆系统**: 完善的短期和长期记忆机制 6. **状态持久化**: 支持智能体状态的保存和恢复 7. **Hook 机制**: 灵活的扩展点,支持自定义行为 8. **前后端分离**: 清晰的前后端架构,易于扩展 ### 设计亮点 1. **MetaPlanner 的任务分解和协调机制** 2. **DeepResearchAgent 的树状研究结构** 3. **DataScienceAgent 的场景感知系统** 4. **AliasToolkit 的工具管理和共享机制** 5. **Hook 机制的灵活扩展能力** 6. **StateModule 的状态持久化机制** ### 未来发展方向 1. **更多智能体模式**: 支持更多专业化场景 2. **更强大的工具生态**: 集成更多第三方工具 3. **更智能的记忆系统**: 基于向量数据库的语义记忆 4. **更高效的执行引擎**: 优化任务调度和资源管理 5. **更丰富的可视化**: 增强任务执行过程的可视化 6. **更完善的测试**: 提高测试覆盖率和质量 --- *报告生成时间: 2025-12-26* *项目版本: 基于 alias 目录最新代码*

讨论回复

0 条回复

还没有人回复,快来发表你的看法吧!