## 目录
1. [项目概述](#项目概述)
2. [整体架构设计](#整体架构设计)
3. [智能体架构与设计模式](#智能体架构与设计模式)
4. [多模式运行系统](#多模式运行系统)
5. [元规划器(Meta Planner)设计](#元规划器meta-planner设计)
6. [深度研究(Deep Research)架构](#深度研究deep-research架构)
7. [数据科学智能体设计](#数据科学智能体设计)
8. [工具系统架构](#工具系统架构)
9. [记忆系统设计](#记忆系统设计)
10. [前后端架构与通信机制](#前后端架构与通信机制)
11. [沙箱执行环境](#沙箱执行环境)
12. [状态管理与持久化](#状态管理与持久化)
13. [扩展点与Hook机制](#扩展点与hook机制)
14. [设计模式与最佳实践](#设计模式与最佳实践)
15. [技术栈与依赖分析](#技术栈与依赖分析)
---
## 项目概述
### 项目定位
Alias 是一个基于 AgentScope 框架构建的多模式智能体系统,旨在提供高度可扩展、模块化的 AI 智能体解决方案。该系统通过多种专业化智能体模式,支持从通用对话到深度研究、数据分析、浏览器自动化等复杂任务。
### 核心特性
1. **多模式智能体架构**: 支持通用模式、浏览器使用、深度研究、金融分析、数据科学等多种专业化模式
2. **元规划系统**: 通过 MetaPlanner 实现复杂任务的自动分解和协调
3. **工具生态系统**: 基于 AliasToolkit 的模块化工具管理,支持动态工具注册和共享
4. **长期记忆系统**: 集成工具记忆和用户画像功能
5. **前后端分离**: 基于 FastAPI 的后端服务和 React 的前端界面
6. **沙箱执行环境**: 安全的代码执行和工具调用环境
7. **状态持久化**: 支持智能体执行状态的保存和恢复
### 项目结构
```
alias/
├── frontend/ # 前端应用 (React + TypeScript)
│ ├── src/
│ └── package.json
├── src/alias/
│ ├── agent/ # 智能体核心实现
│ │ ├── agents/ # 各类智能体实现
│ │ │ ├── _alias_agent_base.py
│ │ │ ├── _meta_planner.py
│ │ │ ├── _deep_research_agent_v2.py
│ │ │ ├── _data_science_agent.py
│ │ │ ├── _browser_use_agent.py
│ │ │ ├── _finance_agent.py
│ │ │ ├── _react_worker.py
│ │ │ ├── meta_planner_utils/
│ │ │ ├── dr_agent_utils/
│ │ │ ├── ds_agent_utils/
│ │ │ └── common_agent_utils/
│ │ ├── tools/ # 工具系统
│ │ │ ├── alias_toolkit.py
│ │ │ ├── improved_tools/
│ │ │ └── toolkit_hooks/
│ │ ├── memory/ # 记忆系统
│ │ │ ├── longterm_memory.py
│ │ │ └── longterm_memory_utils.py
│ │ └── run.py # 智能体运行入口
│ ├── server/ # 后端服务 (FastAPI)
│ │ ├── api/
│ │ ├── core/
│ │ ├── db/
│ │ ├── middleware/
│ │ └── main.py
│ ├── runtime/ # 运行时环境
│ │ └── alias_sandbox/
│ ├── memory_service/ # 记忆服务
│ └── utils/
└── pyproject.toml
```
---
## 整体架构设计
### 分层架构
Alias 项目采用清晰的分层架构设计,从下到上依次为:
```
┌─────────────────────────────────────────────────────────────┐
│ 前端展示层 (Frontend) │
│ React + TypeScript + Ant Design │
└─────────────────────────────────────────────────────────────┘
↕ HTTP/WebSocket
┌─────────────────────────────────────────────────────────────┐
│ API 网关层 (Server) │
│ FastAPI + 中间件 │
└─────────────────────────────────────────────────────────────┘
↕
┌─────────────────────────────────────────────────────────────┐
│ 业务逻辑层 (Agent) │
│ MetaPlanner | DeepResearch | DataScience │
└─────────────────────────────────────────────────────────────┘
↕
┌─────────────────────────────────────────────────────────────┐
│ 核心抽象层 (Base) │
│ AliasAgentBase | ReActAgent │
└─────────────────────────────────────────────────────────────┘
↕
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 (Infrastructure) │
│ Toolkit | Memory | Sandbox | StateModule │
└─────────────────────────────────────────────────────────────┘
```
### 架构设计原则
1. **单一职责原则**: 每个模块专注于特定的功能领域
2. **开闭原则**: 通过继承和组合实现扩展,避免修改核心代码
3. **依赖倒置原则**: 依赖抽象接口而非具体实现
4. **接口隔离原则**: 细粒度的接口设计,避免不必要的依赖
5. **组合优于继承**: 通过工具组合实现功能复用
### 核心设计模式
1. **策略模式**: 不同智能体模式采用不同的执行策略
2. **观察者模式**: Hook 机制实现事件驱动的扩展点
3. **工厂模式**: WorkerManager 动态创建不同类型的 Worker
4. **模板方法模式**: AliasAgentBase 定义执行流程模板
5. **建造者模式**: 深度研究树节点的构建过程
6. **责任链模式**: 工具调用的后处理钩子链
---
## 智能体架构与设计模式
### AliasAgentBase: 智能体基类
`AliasAgentBase` 是所有 Alias 智能体的核心基类,继承自 AgentScope 的 `ReActAgent`,提供了扩展的基础设施和自定义能力。
#### 核心职责
1. **生命周期管理**: 管理智能体的初始化、执行和销毁
2. **工具管理**: 集成 AliasToolkit 进行工具注册和调用
3. **记忆管理**: 集成长期记忆系统
4. **状态管理**: 支持状态的保存和恢复
5. **Hook 机制**: 提供扩展点用于自定义行为
6. **会话管理**: 与 SessionService 集成进行会话状态跟踪
#### 关键初始化参数
```python
class AliasAgentBase(ReActAgent):
def __init__(
self,
name: str, # 智能体名称
model: ChatModelBase, # 语言模型
formatter: FormatterBase, # 消息格式化器
memory: MemoryBase, # 记忆系统
toolkit: AliasToolkit, # 工具包
session_service: Any, # 会话服务
state_saving_dir: Optional[str] = None, # 状态保存目录
sys_prompt: Optional[str] = None, # 系统提示词
max_iters: int = 10, # 最大迭代次数
tool_call_interrupt_return: bool = True, # 工具调用中断返回
long_term_memory: Optional[LongTermMemoryBase] = None, # 长期记忆
long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both",
)
```
#### Hook 机制
Hook 机制是 AliasAgentBase 的核心扩展点,允许在智能体执行的关键阶段插入自定义逻辑:
```python
# Hook 类型
- pre_reply: 回复前的处理
- post_reasoning: 推理后的处理
- post_acting: 动作执行后的处理
- pre_acting: 动作执行前的处理
```
**Hook 注册示例**:
```python
self.register_instance_hook(
"pre_reply",
"agent_load_states_pre_reply_hook",
agent_load_states_pre_reply_hook,
)
```
#### 状态持久化
通过 `register_state` 方法注册需要持久化的状态:
```python
self.register_state(
"deep_research_tree",
custom_to_json=lambda x: x.state_dict() if x else None,
custom_from_json=lambda x: DeepResearchTreeNode.reconstruct_from_state_dict(x, x.get("task_type", "general")),
)
```
### ReActWorker: 工作智能体
`ReActWorker` 是专门用于执行子任务的轻量级智能体,由 MetaPlanner 动态创建和管理。
#### 设计特点
1. **轻量级**: 只包含执行子任务所需的最小功能集
2. **工具隔离**: 每个Worker拥有独立的工具集,避免工具冲突
3. **状态独立**: 拥有独立的记忆和状态空间
4. **可重建**: 支持从状态字典重建,便于持久化和恢复
#### Worker 重建机制
```python
def rebuild_reactworker(
worker_info: WorkerInfo,
old_toolkit: AliasToolkit,
new_toolkit: AliasToolkit,
memory: Optional[MemoryBase] = None,
model: Optional[ChatModelBase] = None,
formatter: Optional[FormatterBase] = None,
exclude_tools: Optional[list[str]] = None,
) -> ReActWorker:
"""重建 ReActAgent Worker"""
# 工具共享
tool_list = [
tool_name
for tool_name in worker_info.tool_lists
if tool_name not in exclude_tools
]
share_tools(old_toolkit, new_toolkit, tool_list)
# 创建新的 Worker
return ReActWorker(
name=worker_info.worker_name,
sys_prompt=worker_info.sys_prompt,
model=model,
formatter=formatter,
toolkit=new_toolkit,
memory=InMemoryMemory() if memory is None else memory,
max_iters=WORKER_MAX_ITER,
)
```
### 智能体类型体系
```
AliasAgentBase (抽象基类)
├── MetaPlanner (元规划器)
├── DeepResearchAgent (深度研究)
├── DataScienceAgent (数据科学)
├── BrowserUseAgent (浏览器使用)
└── FinanceAgent (金融分析)
ReActWorker (工作智能体)
└── 由 MetaPlanner 动态创建
```
---
## 多模式运行系统
### 模式分类
Alias 系统支持五种主要的智能体运行模式:
1. **general**: 通用对话模式,使用 MetaPlanner 进行任务规划
2. **browser**: 浏览器自动化模式,使用 BrowserUseAgent
3. **dr**: 深度研究模式,使用 DeepResearchAgent
4. **ds**: 数据科学模式,使用 DataScienceAgent
5. **finance**: 金融分析模式,使用 FinanceAgent
### 模式路由机制
模式路由在 `run.py` 中实现,根据会话的 `chat_mode` 选择对应的智能体:
```python
async def arun_agents(
session_service: SessionService,
sandbox: Sandbox = None,
):
"""智能体执行入口点"""
chat_mode = session_service.session_entity.chat_mode
if chat_mode == "dr":
await arun_deepresearch_agent(session_service, sandbox)
elif chat_mode == "browser":
await arun_browseruse_agent(session_service, sandbox)
elif chat_mode == "ds":
await arun_datascience_agent(session_service, sandbox)
elif chat_mode == "finance":
await arun_finance_agent(session_service, sandbox)
else:
if chat_mode != "general":
logger.warning(f"Unknown chat mode: {chat_mode}. Invoke general mode instead.")
await arun_meta_planner(session_service, sandbox)
```
### 模式切换策略
1. **显式切换**: 用户在前端界面选择模式
2. **自动识别**: 系统根据任务特征自动推荐模式
3. **动态调整**: MetaPlanner 可以在执行过程中调用不同模式的 Worker
---
## 元规划器(Meta Planner)设计
### 核心概念
MetaPlanner 是 Alias 系统的核心协调组件,负责将复杂任务分解为可执行的子任务,并协调多个 Worker 完成整体任务。
### 架构组件
MetaPlanner 由三个核心组件构成:
```
MetaPlanner
├── PlannerNoteBook (规划笔记本)
│ ├── Roadmap (路线图)
│ ├── SubTaskStatus (子任务状态)
│ └── WorkerInfo (Worker 信息)
├── RoadmapManager (路线图管理器)
│ ├── 任务分解
│ ├── 路线图创建
│ └── 路线图修订
└── WorkerManager (Worker 管理器)
├── Worker 创建
├── Worker 选择
└── Worker 执行协调
```
### PlannerNoteBook: 规划状态管理
PlannerNoteBook 是一个状态容器,用于存储整个规划过程的所有信息:
```python
class PlannerNoteBook(StateModule):
"""规划笔记本,存储规划过程中的所有状态"""
def __init__(self):
self.roadmap: Roadmap = Roadmap()
self.detail_analysis_for_plan: str = ""
self.file_info: dict[str, FileInfo] = {}
self.user_input: list[str] = []
self.worker_pool: dict[str, tuple[WorkerInfo, ReActWorker]] = {}
```
#### Roadmap: 任务路线图
```python
class Roadmap(BaseModel):
"""任务路线图"""
original_task: str = "" # 原始任务
decomposed_tasks: list[SubTaskStatus] = [] # 分解的子任务
def next_unfinished_subtask(self) -> tuple[Optional[int], Optional[SubTaskStatus]]:
"""获取下一个未完成的子任务"""
for idx, subtask in enumerate(self.decomposed_tasks):
if subtask.status in ["todo", "in_progress"]:
return idx, subtask
return None, None
```
#### SubTaskSpecification: 子任务规范
```python
class SubTaskSpecification(BaseModel):
"""子任务规范"""
subtask_name: str # 子任务名称
exact_input: str # 精确输入描述
expected_output: str # 期望输出描述
required_tools: list[str] # 需要的工具列表
worker_type: Literal[ # Worker 类型
"general",
"browser",
"ds",
"finance",
"deep_research",
] = "general"
worker_sys_prompt: str = "" # Worker 系统提示词
```
### RoadmapManager: 路线图管理
RoadmapManager 负责任务分解和路线图的生命周期管理:
#### 核心方法
1. **decompose_task_and_build_roadmap**: 任务分解和路线图创建
```python
async def decompose_task_and_build_roadmap(
self,
user_latest_input: str,
given_task_conclusion: str,
detail_analysis_for_plan: str,
decomposed_subtasks: list[SubTaskSpecification],
) -> ToolResponse:
"""
1) 分析用户任务
2) 推理完成整个任务所需的步骤
3) 将这些步骤分组为几个可管理的子任务,满足:
- 同一子任务的步骤使用相同的工具集
- 同一子任务的步骤不依赖于后续子任务/步骤
- 每个子任务的目标应该清晰且可验证
- 同一目标的推理/分析和生成/动作应该在同一个子任务中
"""
self.planner_notebook.detail_analysis_for_plan = detail_analysis_for_plan
self.planner_notebook.roadmap.original_task = given_task_conclusion
for subtask in decomposed_subtasks:
subtask_status = SubTaskStatus(subtask_specification=subtask)
self.planner_notebook.roadmap.decomposed_tasks.append(subtask_status)
return ToolResponse(
metadata={"success": True},
content=[TextBlock(text="Successfully decomposed the task into subtasks.")],
)
```
2. **revise_roadmap**: 路线图修订
```python
async def revise_roadmap(
self,
action: Literal["add_subtask", "revise_subtask", "remove_subtask"],
subtask_idx: int,
subtask_specification: Optional[SubTaskSpecification] = None,
update_to_subtask: Optional[Update] = None,
new_state: Literal["todo", "in_progress", "done", "abandoned"] = "in_progress",
) -> ToolResponse:
"""根据 Worker 执行报告修订路线图"""
# 根据不同的 action 执行相应的修订操作
```
### WorkerManager: Worker 管理与协调
WorkerManager 负责 Worker 的创建、选择和执行协调:
#### 核心职责
1. **动态 Worker 创建**: 根据任务需求创建不同类型的 Worker
2. **Worker 选择**: 为子任务选择最合适的 Worker
3. **执行协调**: 协调 Worker 的执行顺序和依赖关系
4. **结果处理**: 处理 Worker 的执行结果并更新路线图
#### Worker 创建流程
```python
class WorkerManager(StateModule):
def __init__(
self,
worker_model: ChatModelBase,
worker_formatter: FormatterBase,
planner_notebook: PlannerNoteBook,
worker_full_toolkit: AliasToolkit,
agent_working_dir: str,
sandbox: AliasSandbox,
worker_pool: Optional[dict[str, tuple[WorkerInfo, ReActWorker]]] = None,
session_service: Any = None,
long_term_memory: Optional[LongTermMemoryBase] = None,
):
self.worker_model = worker_model
self.worker_formatter = worker_formatter
self.planner_notebook = planner_notebook
self.worker_full_toolkit = worker_full_toolkit
self.agent_working_dir = agent_working_dir
self.sandbox = sandbox
self.worker_pool = worker_pool or {}
self.session_service = session_service
self.long_term_memory = long_term_memory
```
#### Worker 执行流程
```
1. 获取下一个未完成的子任务
2. 根据子任务规范选择或创建 Worker
3. 配置 Worker 的工具集
4. 执行 Worker
5. 处理 Worker 响应
6. 更新路线图状态
7. 重复直到所有子任务完成
```
### MetaPlanner 执行流程
```
用户输入
↓
[澄清阶段] (可选)
↓
[任务分解] → 创建路线图
↓
[用户确认]
↓
[子任务执行循环]
↓
获取下一个子任务 → 选择/创建 Worker → 执行 Worker → 处理结果 → 更新路线图
↓
[所有子任务完成]
↓
生成最终报告
```
---
## 深度研究(Deep Research)架构
### 核心概念
DeepResearchAgent 是专门用于深度研究和信息收集的智能体,采用树状结构组织研究任务,支持递归分解和假设驱动的研究方法。
### 架构设计
```
DeepResearchAgent
├── DeepResearchTreeNode (研究树节点)
│ ├── DRTaskBase (研究任务基类)
│ │ ├── BasicTask (基础任务)
│ │ └── HypothesisDrivenTask (假设驱动任务)
│ ├── Worker (执行节点任务的 Worker)
│ └── ChildrenNodes (子节点)
├── DeepResearchTree (研究树)
└── ReportGenerator (报告生成器)
```
### DeepResearchTreeNode: 研究树节点
每个节点代表一个研究任务,可以递归分解为子任务:
```python
class DeepResearchTreeNode(StateModule):
def __init__(
self,
task_type: Literal["general", "finance"],
current_executable: Optional[DRTaskBase] = None,
level: int = 0,
max_depth: int = 1,
worker_builder_type: str = "default",
parent_executable: Optional["DRTaskBase"] = None,
report_dir: str = "/workspace",
pre_execute_hook: Union[Callable, Coroutine, None] = None,
):
self.task_type = task_type
self.level = level
self.worker_builder = get_deep_research_worker_builder(
worker_builder_type if task_type == "general" else task_type,
)
self.worker = None
self.current_executable: DRTaskBase = current_executable
self.max_depth = max_depth
self.parent_executable: DRTaskBase = parent_executable
self.children_nodes: list[DeepResearchTreeNode] = []
self.node_execution_result = {}
self.report_dir = report_dir
self.node_report_path = ""
self.node_report = ""
self.pre_execute_hook = pre_execute_hook
```
### 研究树遍历策略
```python
def _get_next_executables(self) -> list[DeepResearchTreeNode]:
"""获取下一个可执行的节点"""
if self.deep_research_tree is None:
return []
ready_nodes: list[DeepResearchTreeNode] = []
stack: list[DeepResearchTreeNode] = [self.deep_research_tree]
parent_ready_states: set[str] = {"done", "abandoned"}
while stack:
node = stack.pop()
parent_is_ready = (
node.parent_executable is None
or node.parent_executable.state in parent_ready_states
)
if (
node.current_executable.state in ["todo", "in_progress"]
and parent_is_ready
and node.level < self.max_depth
):
ready_nodes.append(node)
stack.extend(reversed(node.children_nodes))
return ready_nodes
```
### 假设驱动研究
对于金融等需要假设验证的领域,DeepResearchAgent 支持假设驱动的研究方法:
```python
async def _generate_hypothesis(self, node: DeepResearchTreeNode):
"""为 HypothesisDrivenTask 生成初始假设"""
sys_prompt = PROMPT_INITIALIZE_HYPOTHESES.format(
current_date=_get_timestamp(),
)
instruction_msg = Msg("system", content=[TextBlock(type="text", text=sys_prompt)], role="system")
user_msg = Msg(
"user",
content=[
TextBlock(
type="text",
text=f"Research Question: {node.current_executable.description}\n\nGenerate 2-4 key hypotheses.",
),
],
role="user",
)
class HypothesesSchema(BaseModel):
hypotheses: list[str] = Field(description="List of 2-4 testable hypotheses")
try:
prompt = await self.formatter.format([instruction_msg, user_msg])
res = await self.model(prompt, structured_model=HypothesesSchema)
hypotheses = None
if self.model.stream:
async for content_chunk in res:
if content_chunk.metadata and "hypotheses" in content_chunk.metadata:
hypotheses = content_chunk.metadata["hypotheses"]
else:
if res.metadata and "hypotheses" in res.metadata:
hypotheses = res.metadata["hypotheses"]
if hypotheses:
for hypothesis in hypotheses:
hypothesis_task = HypothesisDrivenTask(
description=f"Investigate hypothesis:{hypothesis}",
evidences=[],
parent_executable=node,
max_depth=node.max_depth,
deep_research_worker_builder=node.worker_builder,
level=node.level + 1,
)
node.children_nodes.append(
DeepResearchTreeNode(
task_type="finance",
current_executable=hypothesis_task,
level=node.level + 1,
parent_executable=None,
max_depth=self.max_depth,
report_dir=self.agent_working_dir,
pre_execute_hook=None,
),
)
node.current_executable.state = "done"
except Exception as e:
logger.warning(f"Failed to generate hypotheses: {e}")
```
### 深度研究执行流程
```
用户查询
↓
[初步信息收集] (可选)
↓
[澄清阶段] (可选)
↓
创建根节点
↓
[研究树遍历]
↓
获取可执行节点
↓
执行节点任务 (使用 Worker)
↓
生成节点报告
↓
[是否需要分解?]
├── 是 → 生成子节点 → 返回遍历
└── 否 → 继续遍历
↓
[所有节点完成]
↓
生成最终报告
```
---
## 数据科学智能体设计
### 核心概念
DataScienceAgent 是专门用于数据分析、可视化和建模的智能体,集成了 Jupyter/IPython 环境和专业的数据科学工具链。
### 架构特点
1. **场景感知**: 根据任务类型自动选择合适的提示词和工具
2. **任务管理**: 内置 TodoList 管理数据分析任务
3. **代码执行**: 集成 IPython 环境进行代码执行
4. **报告生成**: 自动生成详细的数据分析报告
### 核心组件
```python
class DataScienceAgent(AliasAgentBase):
def __init__(
self,
name: str,
model: ChatModelBase,
formatter: FormatterBase,
memory: MemoryBase,
toolkit: AliasToolkit,
sys_prompt: str = None,
max_iters: int = 30,
tmp_file_storage_dir: str = "/workspace",
state_saving_dir: Optional[str] = None,
session_service: Any = None,
) -> None:
self.think_function_name = "think"
self.uploaded_files: List[str] = []
self.todo_list: List[Dict[str, Any]] = []
self.infer_trajectories: List[List[Msg]] = []
self.detailed_report_path = os.path.join(tmp_file_storage_dir, "detailed_report.html")
self.tmp_file_storage_dir = tmp_file_storage_dir
```
### 场景感知系统
DataScienceAgent 使用 LLMPromptSelector 根据任务类型选择合适的提示词:
```python
available_prompts = {
"explorative_data_analysis": get_prompt_from_file("_scenario_explorative_data_analysis.md"),
"data_modeling": get_prompt_from_file("_scenario_data_modeling_prompt.md"),
"data_computation": get_prompt_from_file("_scenario_data_computation_prompt.md"),
}
self.prompt_selector = LLMPromptSelector(
self.model,
self.formatter,
available_prompts,
)
```
### TodoList 管理
内置 TodoList 用于跟踪数据分析任务:
```python
def todo_write(
agent: DataScienceAgent,
action: Literal["add", "update", "delete", "mark_done"],
task_name: str,
task_description: str = "",
task_status: Literal["todo", "in_progress", "done"] = "todo",
) -> ToolResponse:
"""管理数据分析任务的 TodoList"""
if action == "add":
agent.todo_list.append({
"task_name": task_name,
"task_description": task_description,
"task_status": task_status,
})
elif action == "update":
for task in agent.todo_list:
if task["task_name"] == task_name:
task["task_description"] = task_description
task["task_status"] = task_status
elif action == "delete":
agent.todo_list = [t for t in agent.todo_list if t["task_name"] != task_name]
elif action == "mark_done":
for task in agent.todo_list:
if task["task_name"] == task_name:
task["task_status"] = "done"
return ToolResponse(
metadata={"success": True},
content=[TextBlock(text=f"TodoList updated: {json.dumps(agent.todo_list, indent=2)}")],
)
```
### IPython 环境集成
```python
def install_package(sandbox: AliasSandbox):
"""安装数据科学所需的 Python 包"""
packages = [
"pandas",
"numpy",
"matplotlib",
"seaborn",
"scikit-learn",
"plotly",
]
for package in packages:
try:
sandbox.call_tool("pip_install", {"package": package})
except Exception as e:
logger.warning(f"Failed to install {package}: {e}")
def set_run_ipython_cell(sandbox: AliasSandbox):
"""配置 IPython 单元格执行"""
sandbox.call_tool("ipython_setup", {})
```
### 动态系统提示词
DataScienceAgent 的系统提示词是动态构建的,包含基础提示词、场景提示词和 TodoList 提示词:
```python
@property
def sys_prompt(self) -> str:
base_prompt = self._sys_prompt
todo_prompt = self.todo_list_prompt.replace(
"{todoList}",
json.dumps(self.todo_list, indent=2, ensure_ascii=False),
)
return f"{base_prompt}{self._selected_scenario_prompts}\n\n{todo_prompt}"
```
### 数据科学执行流程
```
用户上传数据文件
↓
[场景识别] → 选择合适的提示词
↓
[探索性数据分析]
├── 数据加载
├── 数据清洗
├── 统计分析
└── 可视化
↓
[数据建模] (可选)
├── 特征工程
├── 模型训练
└── 模型评估
↓
[数据计算] (可选)
├── 数据转换
├── 聚合计算
└── 高级分析
↓
生成详细报告
```
---
## 工具系统架构
### AliasToolkit: 核心工具包
AliasToolkit 是 Alias 系统的工具管理核心,继承自 AgentScope 的 Toolkit,提供了增强的工具管理能力。
### 核心功能
1. **工具注册**: 支持函数和 JSON Schema 两种注册方式
2. **工具分组**: 通过 group_name 组织工具
3. **工具共享**: 在不同智能体间共享工具
4. **工具黑名单**: 过滤不需要的工具
5. **后处理钩子**: 对工具输出进行后处理
6. **MCP 集成**: 支持 Model Context Protocol 客户端
### 初始化
```python
class AliasToolkit(Toolkit):
def __init__(
self,
sandbox: AliasSandbox = None,
add_all: bool = False,
is_browser_toolkit: bool = False,
tool_blacklist: list = TOOL_BLACKLIST,
):
super().__init__()
self.sandbox = sandbox
self.session_id = self.sandbox.sandbox_id if sandbox else None
self.categorized_functions = {}
self.tool_blacklist = tool_blacklist
if add_all and sandbox:
tools_schema = self.sandbox.list_tools()
for category, function_dicts in tools_schema.items():
if (is_browser_toolkit and category == "playwright") or (
not is_browser_toolkit and category != "playwright"
):
for _, function_json in function_dicts.items():
if function_json["name"] not in self.tool_blacklist:
self._add_io_function(function_json)
# 添加改进的文件操作工具
file_sys = ImprovedFileOperations(sandbox)
self.register_tool_function(file_sys.read_file)
self.additional_mcp_clients = []
self.long_text_post_hook = LongTextPostHook(sandbox)
self._add_tool_postprocessing_func()
```
### 工具注册
```python
def _add_io_function(
self,
json_schema: dict,
is_browser_tool: bool = False,
) -> None:
"""添加 IO 函数"""
tool_name = json_schema["name"]
def wrap_tool_func(name: str) -> Callable:
def wrapper(**kwargs) -> ToolResponse:
try:
result = self.sandbox.call_tool(name=name, arguments=kwargs)
if isinstance(result, dict) and "content" in result:
content = result["content"]
if isinstance(content, list):
for i, block in enumerate(content):
if isinstance(block, dict) and "annotations" in block:
block.pop("annotations")
content[i] = block
else:
content = [TextBlock(type="text", text=str(result))]
return ToolResponse(
metadata={"success": True, "tool_name": name},
content=content,
)
except Exception as e:
logger.error(f"Error executing tool {name}: {str(e)}")
return ToolResponse(
metadata={"success": False, "tool_name": name, "error": str(e)},
content=[TextBlock(type="text", text=f"Error executing tool {name}: {str(e)}")],
)
wrapper.__name__ = name
return wrapper
tool_func = wrap_tool_func(tool_name)
self.register_tool_function(
tool_func=tool_func,
json_schema=json_schema.get("json_schema", {}),
)
```
### 工具共享
```python
def share_tools(
source_toolkit: AliasToolkit,
target_toolkit: AliasToolkit,
tool_names: list[str],
) -> None:
"""在工具包之间共享工具"""
for tool_name in tool_names:
if tool_name in source_toolkit.tools:
target_toolkit.tools[tool_name] = source_toolkit.tools[tool_name]
```
### 工具后处理钩子
```python
def _add_tool_postprocessing_func(self) -> None:
"""添加工具后处理函数"""
long_text_hook = LongTextPostHook(self.sandbox)
for tool_func, _ in self.tools.items():
if tool_func.startswith(("read_file", "read_multiple_files")):
self.tools[tool_func].postprocess_func = read_file_post_hook
if tool_func.startswith("tavily"):
self.tools[tool_func].postprocess_func = long_text_hook.truncate_and_save_response
```
### MCP 客户端集成
```python
async def add_and_connect_mcp_client(
self,
mcp_client: MCPClientBase,
group_name: str = "basic",
enable_funcs: list[str] | None = None,
disable_funcs: list[str] | None = None,
preset_kwargs_mapping: dict[str, dict[str, Any]] | None = None,
postprocess_func: Callable[[ToolUseBlock, ToolResponse], ToolResponse | None] | None = None,
):
"""添加并连接 MCP 客户端"""
if isinstance(mcp_client, StatefulClientBase):
await mcp_client.connect()
self.additional_mcp_clients.append(mcp_client)
await self.register_mcp_client(
mcp_client,
enable_funcs=enable_funcs,
group_name=group_name,
disable_funcs=disable_funcs,
preset_kwargs_mapping=preset_kwargs_mapping,
postprocess_func=postprocess_func,
)
elif isinstance(mcp_client, HttpStatelessClient):
self.additional_mcp_clients.append(mcp_client)
await self.register_mcp_client(
mcp_client,
enable_funcs=enable_funcs,
group_name=group_name,
disable_funcs=disable_funcs,
preset_kwargs_mapping=preset_kwargs_mapping,
postprocess_func=postprocess_func,
)
```
### 工具分类
AliasToolkit 支持按类别组织工具:
```
工具分类
├── basic: 基础工具 (文件操作、系统命令等)
├── browser: 浏览器工具 (Playwright)
├── data_science: 数据科学工具 (pandas, numpy, matplotlib)
├── finance: 金融工具 (股票查询、财务分析)
├── deep_research: 深度研究工具 (搜索、爬虫)
└── custom: 自定义工具
```
---
## 记忆系统设计
### 记忆系统架构
Alias 的记忆系统采用分层设计,结合短期记忆和长期记忆:
```
记忆系统
├── 短期记忆 (InMemoryMemory)
│ ├── 会话上下文
│ ├── 工具调用历史
│ └── 推理轨迹
└── 长期记忆 (LongTermMemoryBase)
├── 工具记忆 (Tool Memory)
└── 用户画像 (User Profiling)
```
### 长期记忆模式
AliasAgentBase 支持三种长期记忆模式:
```python
long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both"
```
1. **agent_control**: 智能体自主决定何时访问长期记忆
2. **static_control**: 在特定阶段静态访问长期记忆
3. **both**: 结合两种模式
### 工具记忆
工具记忆记录工具的使用历史和效果,用于优化工具选择:
```python
class ToolMemory:
"""工具记忆"""
def __init__(self):
self.tool_usage_history: dict[str, list[ToolUsageRecord]] = {}
self.tool_success_rate: dict[str, float] = {}
def record_tool_usage(
self,
tool_name: str,
arguments: dict,
result: ToolResponse,
context: str,
):
"""记录工具使用"""
if tool_name not in self.tool_usage_history:
self.tool_usage_history[tool_name] = []
record = ToolUsageRecord(
tool_name=tool_name,
arguments=arguments,
success=result.metadata.get("success", False),
timestamp=datetime.now(),
context=context,
)
self.tool_usage_history[tool_name].append(record)
self._update_success_rate(tool_name)
def _update_success_rate(self, tool_name: str):
"""更新工具成功率"""
records = self.tool_usage_history.get(tool_name, [])
if records:
success_count = sum(1 for r in records if r.success)
self.tool_success_rate[tool_name] = success_count / len(records)
```
### 用户画像
用户画像记录用户的偏好、历史行为和特征:
```python
class UserProfile:
"""用户画像"""
def __init__(self):
self.user_id: str = ""
self.preferences: dict[str, Any] = {}
self.task_history: list[TaskRecord] = []
self.frequently_used_tools: list[str] = []
self.communication_style: str = "formal"
def update_preferences(self, new_preferences: dict[str, Any]):
"""更新用户偏好"""
self.preferences.update(new_preferences)
def record_task(self, task: TaskRecord):
"""记录任务"""
self.task_history.append(task)
self._update_frequently_used_tools(task.tools_used)
def _update_frequently_used_tools(self, tools: list[str]):
"""更新常用工具"""
from collections import Counter
all_tools = [t for task in self.task_history for t in task.tools_used]
counter = Counter(all_tools)
self.frequently_used_tools = [tool for tool, _ in counter.most_common(10)]
```
### 记忆访问 Hook
通过 Hook 机制在智能体执行的不同阶段访问记忆:
```python
def get_user_input_to_mem_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg:
"""将用户输入添加到记忆"""
if msg and msg.role == "user":
asyncio.create_task(agent.memory.add(msg))
return msg
def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg:
"""保存推理状态"""
if agent.state_saving_dir:
state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json")
with open(state_path, "w") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"reasoning": reasoning_msg.model_dump(),
}, f)
return reasoning_msg
def save_post_action_state(agent: AliasAgentBase, action_msg: Msg) -> Msg:
"""保存动作状态"""
if agent.state_saving_dir:
state_path = os.path.join(agent.state_saving_dir, "action_state.json")
with open(state_path, "w") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"action": action_msg.model_dump(),
}, f)
return action_msg
```
---
## 前后端架构与通信机制
### 后端架构 (FastAPI)
后端基于 FastAPI 构建,提供 RESTful API 和 WebSocket 支持:
```python
def create_app():
application = FastAPI(
generate_unique_id_function=custom_generate_unique_id,
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
lifespan=lifespan,
)
# 中间件
application.add_middleware(CORSMiddleware, allow_origins=["*"])
application.add_middleware(RequestContextMiddleware)
application.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY)
# 异常处理
application.add_exception_handler(BaseError, base_exception_handler)
# 路由
application.include_router(api_router)
return application
```
### 生命周期管理
```python
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""应用生命周期管理"""
# 启动
print("🚀 Starting Alias API Server...")
setup_logger()
await initialize_database()
await task_manager.start()
await redis_client.ping()
try:
await FastAPILimiter.init(redis_client)
except Exception as e:
print(f"redis init error: {str(e)}")
yield
# 关闭
await task_manager.stop()
await close_database()
```
### 前端架构 (React + TypeScript)
前端基于 React 和 TypeScript 构建,使用 Ant Design 作为 UI 组件库:
```json
{
"name": "alias_frontend",
"dependencies": {
"react": "^18.2.0",
"react-dom": "^18.2.0",
"antd": "^5.24.8",
"@agentscope-ai/chat": "1.1.17",
"@agentscope-ai/design": "1.0.11",
"axios": "^1.8.4",
"socket.io-client": "^4.8.1",
"react-router-dom": "^6.22.3",
"react-markdown": "^10.1.0",
"@codemirror/lang-python": "^6.1.7",
"@xterm/xterm": "^5.5.0"
}
}
```
### 通信机制
#### REST API
```
POST /api/v1/chat
POST /api/v1/session/create
POST /api/v1/session/{session_id}/message
GET /api/v1/session/{session_id}/history
POST /api/v1/agent/execute
GET /api/v1/agent/status
```
#### WebSocket
用于实时通信和流式响应:
```typescript
const socket = io('http://localhost:8000', {
transports: ['websocket'],
});
socket.on('connect', () => {
console.log('Connected to server');
});
socket.on('message', (data) => {
console.log('Received message:', data);
});
socket.on('agent_response', (data) => {
console.log('Agent response:', data);
});
```
### 会话管理
SessionService 负责会话状态管理:
```python
class SessionService:
def __init__(self, session_id: str):
self.session_id = session_id
self.session_entity = SessionEntity(
session_id=session_id,
chat_mode="general",
created_at=datetime.now(),
)
self.message_history: list[Msg] = []
self.agent_state: dict[str, Any] = {}
async def add_message(self, msg: Msg):
"""添加消息到历史"""
self.message_history.append(msg)
async def get_history(self) -> list[Msg]:
"""获取消息历史"""
return self.message_history
async def save_state(self, state: dict[str, Any]):
"""保存智能体状态"""
self.agent_state = state
```
---
## 沙箱执行环境
### AliasSandbox: 沙箱核心
AliasSandbox 提供安全的代码执行和工具调用环境:
```python
class AliasSandbox:
def __init__(self, sandbox_id: str):
self.sandbox_id = sandbox_id
self.workspace = f"/workspace/{sandbox_id}"
self._initialize_environment()
def _initialize_environment(self):
"""初始化沙箱环境"""
os.makedirs(self.workspace, exist_ok=True)
def list_tools(self) -> dict[str, dict]:
"""列出可用工具"""
return {
"file_system": {...},
"code_execution": {...},
"browser": {...},
"data_science": {...},
}
def call_tool(self, name: str, arguments: dict) -> ToolResponse:
"""调用工具"""
try:
tool_func = self._get_tool_function(name)
result = tool_func(**arguments)
return ToolResponse(
metadata={"success": True, "tool_name": name},
content=[TextBlock(type="text", text=str(result))],
)
except Exception as e:
return ToolResponse(
metadata={"success": False, "tool_name": name, "error": str(e)},
content=[TextBlock(type="text", text=f"Error: {str(e)}")],
)
```
### 安全特性
1. **隔离执行**: 每个会话使用独立的沙箱环境
2. **资源限制**: 限制 CPU、内存和磁盘使用
3. **网络隔离**: 控制网络访问权限
4. **文件系统隔离**: 独立的工作目录
5. **超时控制**: 工具调用超时机制
### 改进的文件操作
```python
class ImprovedFileOperations:
"""改进的文件操作工具"""
def __init__(self, sandbox: AliasSandbox):
self.sandbox = sandbox
def read_file(self, path: str) -> ToolResponse:
"""读取文件"""
try:
full_path = os.path.join(self.sandbox.workspace, path)
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
return ToolResponse(
metadata={"success": True},
content=[TextBlock(type="text", text=content)],
)
except Exception as e:
return ToolResponse(
metadata={"success": False, "error": str(e)},
content=[TextBlock(type="text", text=f"Error reading file: {str(e)}")],
)
def write_file(self, path: str, content: str) -> ToolResponse:
"""写入文件"""
try:
full_path = os.path.join(self.sandbox.workspace, path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w", encoding="utf-8") as f:
f.write(content)
return ToolResponse(
metadata={"success": True},
content=[TextBlock(type="text", text=f"File written successfully")],
)
except Exception as e:
return ToolResponse(
metadata={"success": False, "error": str(e)},
content=[TextBlock(type="text", text=f"Error writing file: {str(e)}")],
)
```
---
## 状态管理与持久化
### StateModule: 状态模块基类
StateModule 提供状态持久化的基础设施:
```python
class StateModule:
def __init__(self):
self._registered_states: dict[str, StateConfig] = {}
def register_state(
self,
name: str,
custom_to_json: Optional[Callable] = None,
custom_from_json: Optional[Callable] = None,
):
"""注册状态"""
self._registered_states[name] = StateConfig(
name=name,
custom_to_json=custom_to_json,
custom_from_json=custom_from_json,
)
def state_dict(self) -> dict[str, Any]:
"""获取状态字典"""
state = {}
for name, config in self._registered_states.items():
value = getattr(self, name)
if config.custom_to_json:
state[name] = config.custom_to_json(value)
else:
state[name] = value
return state
def load_state_dict(self, state_dict: dict[str, Any]):
"""从状态字典加载"""
for name, config in self._registered_states.items():
if name in state_dict:
if config.custom_from_json:
setattr(self, name, config.custom_from_json(state_dict[name]))
else:
setattr(self, name, state_dict[name])
```
### 状态保存与恢复
```python
async def save_agent_state(agent: AliasAgentBase, session_id: str):
"""保存智能体状态"""
if agent.state_saving_dir:
state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json")
with open(state_path, "w") as f:
json.dump(agent.state_dict(), f)
async def load_agent_state(agent: AliasAgentBase, session_id: str):
"""加载智能体状态"""
if agent.state_saving_dir:
state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json")
if os.path.exists(state_path):
with open(state_path, "r") as f:
state_dict = json.load(f)
agent.load_state_dict(state_dict)
```
---
## 扩展点与Hook机制
### Hook 类型
AliasAgentBase 提供多个 Hook 扩展点:
```python
# Hook 扩展点
- pre_reply: 回复前
- post_reasoning: 推理后
- pre_acting: 动作执行前
- post_acting: 动作执行后
```
### Hook 注册
```python
self.register_instance_hook(
"pre_reply",
"agent_load_states_pre_reply_hook",
agent_load_states_pre_reply_hook,
)
```
### Hook 实现
```python
async def agent_load_states_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg:
"""加载智能体状态"""
if agent.state_saving_dir and agent.session_service:
await load_agent_state(agent, agent.session_service.session_id)
return msg
async def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg:
"""保存推理状态"""
if agent.state_saving_dir:
state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json")
with open(state_path, "w") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"reasoning": reasoning_msg.model_dump(),
}, f)
return reasoning_msg
```
---
## 设计模式与最佳实践
### 设计模式总结
1. **策略模式**: 不同智能体模式采用不同的执行策略
2. **观察者模式**: Hook 机制实现事件驱动的扩展点
3. **工厂模式**: WorkerManager 动态创建不同类型的 Worker
4. **模板方法模式**: AliasAgentBase 定义执行流程模板
5. **建造者模式**: 深度研究树节点的构建过程
6. **责任链模式**: 工具调用的后处理钩子链
7. **单例模式**: SessionService 和某些全局服务
8. **装饰器模式**: Hook 机制本质上是一种装饰器模式
### 最佳实践
1. **模块化设计**: 每个模块职责单一,高内聚低耦合
2. **接口抽象**: 依赖抽象接口而非具体实现
3. **配置驱动**: 通过配置文件和环境变量控制行为
4. **错误处理**: 完善的异常处理和错误恢复机制
5. **日志记录**: 使用 loguru 进行结构化日志记录
6. **类型注解**: 使用 Python 类型注解提高代码可读性
7. **文档字符串**: 完善的文档字符串和注释
8. **测试覆盖**: 单元测试和集成测试
---
## 技术栈与依赖分析
### 后端技术栈
```python
# 核心框架
- agentscope: 智能体框架
- fastapi: Web 框架
- uvicorn: ASGI 服务器
- pydantic: 数据验证
# 数据库
- sqlalchemy: ORM
- redis: 缓存和会话存储
# 工具和库
- loguru: 日志记录
- tenacity: 重试机制
- playwright: 浏览器自动化
- pandas: 数据处理
- numpy: 数值计算
- matplotlib: 数据可视化
```
### 前端技术栈
```json
{
"核心框架": ["react", "react-dom", "typescript"],
"UI组件库": ["antd", "@agentscope-ai/design"],
"状态管理": ["@agentscope-ai/chat"],
"路由": ["react-router-dom"],
"HTTP客户端": ["axios"],
"实时通信": ["socket.io-client"],
"代码编辑器": ["@codemirror/lang-python", "@uiw/react-codemirror"],
"终端": ["@xterm/xterm"],
"Markdown渲染": ["react-markdown", "remark-gfm"],
"样式": ["tailwindcss", "less", "sass"]
}
```
### 依赖关系图
```
agentscope
├── ReActAgent
│ └── AliasAgentBase
│ ├── MetaPlanner
│ ├── DeepResearchAgent
│ ├── DataScienceAgent
│ ├── BrowserUseAgent
│ └── FinanceAgent
├── Toolkit
│ └── AliasToolkit
├── MemoryBase
│ ├── InMemoryMemory
│ └── LongTermMemoryBase
└── StateModule
fastapi
├── API Router
├── Middleware
└── Exception Handlers
react
├── Components
├── Hooks
└── Context
```
---
## 总结
Alias 项目是一个设计精良、架构清晰的多模式智能体系统。通过模块化设计、清晰的分层架构和丰富的扩展点,系统具有高度的可扩展性和可维护性。
### 核心优势
1. **模块化架构**: 清晰的模块划分,职责明确
2. **多模式支持**: 支持多种专业化智能体模式
3. **元规划能力**: 强大的任务分解和协调能力
4. **工具生态系统**: 丰富的工具和灵活的工具管理
5. **记忆系统**: 完善的短期和长期记忆机制
6. **状态持久化**: 支持智能体状态的保存和恢复
7. **Hook 机制**: 灵活的扩展点,支持自定义行为
8. **前后端分离**: 清晰的前后端架构,易于扩展
### 设计亮点
1. **MetaPlanner 的任务分解和协调机制**
2. **DeepResearchAgent 的树状研究结构**
3. **DataScienceAgent 的场景感知系统**
4. **AliasToolkit 的工具管理和共享机制**
5. **Hook 机制的灵活扩展能力**
6. **StateModule 的状态持久化机制**
### 未来发展方向
1. **更多智能体模式**: 支持更多专业化场景
2. **更强大的工具生态**: 集成更多第三方工具
3. **更智能的记忆系统**: 基于向量数据库的语义记忆
4. **更高效的执行引擎**: 优化任务调度和资源管理
5. **更丰富的可视化**: 增强任务执行过程的可视化
6. **更完善的测试**: 提高测试覆盖率和质量
---
*报告生成时间: 2025-12-26*
*项目版本: 基于 alias 目录最新代码*
登录后可参与表态
讨论回复
0 条回复还没有人回复,快来发表你的看法吧!