您正在查看静态缓存页面 · 查看完整动态版本 · 登录 参与讨论

Alias 项目架构与设计分析报告

✨步子哥 (steper) 2025年12月26日 09:54 0 次浏览

目录

  1. 项目概述
  2. 整体架构设计
  3. 智能体架构与设计模式
  4. 多模式运行系统
  5. 元规划器(Meta Planner)设计
  6. 深度研究(Deep Research)架构
  7. 数据科学智能体设计
  8. 工具系统架构
  9. 记忆系统设计
  10. 前后端架构与通信机制
  11. 沙箱执行环境
  12. 状态管理与持久化
  13. 扩展点与Hook机制
  14. 设计模式与最佳实践
  15. 技术栈与依赖分析

项目概述

项目定位

Alias 是一个基于 AgentScope 框架构建的多模式智能体系统,旨在提供高度可扩展、模块化的 AI 智能体解决方案。该系统通过多种专业化智能体模式,支持从通用对话到深度研究、数据分析、浏览器自动化等复杂任务。

核心特性

  1. 多模式智能体架构: 支持通用模式、浏览器使用、深度研究、金融分析、数据科学等多种专业化模式
  2. 元规划系统: 通过 MetaPlanner 实现复杂任务的自动分解和协调
  3. 工具生态系统: 基于 AliasToolkit 的模块化工具管理,支持动态工具注册和共享
  4. 长期记忆系统: 集成工具记忆和用户画像功能
  5. 前后端分离: 基于 FastAPI 的后端服务和 React 的前端界面
  6. 沙箱执行环境: 安全的代码执行和工具调用环境
  7. 状态持久化: 支持智能体执行状态的保存和恢复

项目结构

alias/
├── frontend/                    # 前端应用 (React + TypeScript)
│   ├── src/
│   └── package.json
├── src/alias/
│   ├── agent/                   # 智能体核心实现
│   │   ├── agents/             # 各类智能体实现
│   │   │   ├── _alias_agent_base.py
│   │   │   ├── _meta_planner.py
│   │   │   ├── _deep_research_agent_v2.py
│   │   │   ├── _data_science_agent.py
│   │   │   ├── _browser_use_agent.py
│   │   │   ├── _finance_agent.py
│   │   │   ├── _react_worker.py
│   │   │   ├── meta_planner_utils/
│   │   │   ├── dr_agent_utils/
│   │   │   ├── ds_agent_utils/
│   │   │   └── common_agent_utils/
│   │   ├── tools/              # 工具系统
│   │   │   ├── alias_toolkit.py
│   │   │   ├── improved_tools/
│   │   │   └── toolkit_hooks/
│   │   ├── memory/             # 记忆系统
│   │   │   ├── longterm_memory.py
│   │   │   └── longterm_memory_utils.py
│   │   └── run.py             # 智能体运行入口
│   ├── server/                 # 后端服务 (FastAPI)
│   │   ├── api/
│   │   ├── core/
│   │   ├── db/
│   │   ├── middleware/
│   │   └── main.py
│   ├── runtime/                # 运行时环境
│   │   └── alias_sandbox/
│   ├── memory_service/         # 记忆服务
│   └── utils/
└── pyproject.toml

整体架构设计

分层架构

Alias 项目采用清晰的分层架构设计,从下到上依次为:

┌─────────────────────────────────────────────────────────────┐
│                      前端展示层 (Frontend)                    │
│                   React + TypeScript + Ant Design             │
└─────────────────────────────────────────────────────────────┘
                              ↕ HTTP/WebSocket
┌─────────────────────────────────────────────────────────────┐
│                       API 网关层 (Server)                     │
│                      FastAPI + 中间件                          │
└─────────────────────────────────────────────────────────────┘
                              ↕
┌─────────────────────────────────────────────────────────────┐
│                      业务逻辑层 (Agent)                        │
│              MetaPlanner | DeepResearch | DataScience         │
└─────────────────────────────────────────────────────────────┘
                              ↕
┌─────────────────────────────────────────────────────────────┐
│                     核心抽象层 (Base)                         │
│                   AliasAgentBase | ReActAgent                 │
└─────────────────────────────────────────────────────────────┘
                              ↕
┌─────────────────────────────────────────────────────────────┐
│                    基础设施层 (Infrastructure)                │
│              Toolkit | Memory | Sandbox | StateModule         │
└─────────────────────────────────────────────────────────────┘

架构设计原则

  1. 单一职责原则: 每个模块专注于特定的功能领域
  2. 开闭原则: 通过继承和组合实现扩展,避免修改核心代码
  3. 依赖倒置原则: 依赖抽象接口而非具体实现
  4. 接口隔离原则: 细粒度的接口设计,避免不必要的依赖
  5. 组合优于继承: 通过工具组合实现功能复用

核心设计模式

  1. 策略模式: 不同智能体模式采用不同的执行策略
  2. 观察者模式: Hook 机制实现事件驱动的扩展点
  3. 工厂模式: WorkerManager 动态创建不同类型的 Worker
  4. 模板方法模式: AliasAgentBase 定义执行流程模板
  5. 建造者模式: 深度研究树节点的构建过程
  6. 责任链模式: 工具调用的后处理钩子链

智能体架构与设计模式

AliasAgentBase: 智能体基类

AliasAgentBase 是所有 Alias 智能体的核心基类,继承自 AgentScope 的 ReActAgent,提供了扩展的基础设施和自定义能力。

核心职责

  1. 生命周期管理: 管理智能体的初始化、执行和销毁
  2. 工具管理: 集成 AliasToolkit 进行工具注册和调用
  3. 记忆管理: 集成长期记忆系统
  4. 状态管理: 支持状态的保存和恢复
  5. Hook 机制: 提供扩展点用于自定义行为
  6. 会话管理: 与 SessionService 集成进行会话状态跟踪

关键初始化参数

class AliasAgentBase(ReActAgent):
    def __init__(
        self,
        name: str,                              # 智能体名称
        model: ChatModelBase,                    # 语言模型
        formatter: FormatterBase,                # 消息格式化器
        memory: MemoryBase,                      # 记忆系统
        toolkit: AliasToolkit,                   # 工具包
        session_service: Any,                    # 会话服务
        state_saving_dir: Optional[str] = None,  # 状态保存目录
        sys_prompt: Optional[str] = None,        # 系统提示词
        max_iters: int = 10,                     # 最大迭代次数
        tool_call_interrupt_return: bool = True,  # 工具调用中断返回
        long_term_memory: Optional[LongTermMemoryBase] = None,  # 长期记忆
        long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both",
    )

Hook 机制

Hook 机制是 AliasAgentBase 的核心扩展点,允许在智能体执行的关键阶段插入自定义逻辑:

# Hook 类型
- pre_reply: 回复前的处理
- post_reasoning: 推理后的处理
- post_acting: 动作执行后的处理
- pre_acting: 动作执行前的处理

Hook 注册示例:

self.register_instance_hook(
    "pre_reply",
    "agent_load_states_pre_reply_hook",
    agent_load_states_pre_reply_hook,
)

状态持久化

通过 register_state 方法注册需要持久化的状态:

self.register_state(
    "deep_research_tree",
    custom_to_json=lambda x: x.state_dict() if x else None,
    custom_from_json=lambda x: DeepResearchTreeNode.reconstruct_from_state_dict(x, x.get("task_type", "general")),
)

ReActWorker: 工作智能体

ReActWorker 是专门用于执行子任务的轻量级智能体,由 MetaPlanner 动态创建和管理。

设计特点

  1. 轻量级: 只包含执行子任务所需的最小功能集
  2. 工具隔离: 每个Worker拥有独立的工具集,避免工具冲突
  3. 状态独立: 拥有独立的记忆和状态空间
  4. 可重建: 支持从状态字典重建,便于持久化和恢复

Worker 重建机制

def rebuild_reactworker(
    worker_info: WorkerInfo,
    old_toolkit: AliasToolkit,
    new_toolkit: AliasToolkit,
    memory: Optional[MemoryBase] = None,
    model: Optional[ChatModelBase] = None,
    formatter: Optional[FormatterBase] = None,
    exclude_tools: Optional[list[str]] = None,
) -> ReActWorker:
    """重建 ReActAgent Worker"""
    # 工具共享
    tool_list = [
        tool_name
        for tool_name in worker_info.tool_lists
        if tool_name not in exclude_tools
    ]
    share_tools(old_toolkit, new_toolkit, tool_list)
    
    # 创建新的 Worker
    return ReActWorker(
        name=worker_info.worker_name,
        sys_prompt=worker_info.sys_prompt,
        model=model,
        formatter=formatter,
        toolkit=new_toolkit,
        memory=InMemoryMemory() if memory is None else memory,
        max_iters=WORKER_MAX_ITER,
    )

智能体类型体系

AliasAgentBase (抽象基类)
    ├── MetaPlanner (元规划器)
    ├── DeepResearchAgent (深度研究)
    ├── DataScienceAgent (数据科学)
    ├── BrowserUseAgent (浏览器使用)
    └── FinanceAgent (金融分析)

ReActWorker (工作智能体)
    └── 由 MetaPlanner 动态创建

多模式运行系统

模式分类

Alias 系统支持五种主要的智能体运行模式:

  1. general: 通用对话模式,使用 MetaPlanner 进行任务规划
  2. browser: 浏览器自动化模式,使用 BrowserUseAgent
  3. dr: 深度研究模式,使用 DeepResearchAgent
  4. ds: 数据科学模式,使用 DataScienceAgent
  5. finance: 金融分析模式,使用 FinanceAgent

模式路由机制

模式路由在 run.py 中实现,根据会话的 chat_mode 选择对应的智能体:

async def arun_agents(
    session_service: SessionService,
    sandbox: Sandbox = None,
):
    """智能体执行入口点"""
    chat_mode = session_service.session_entity.chat_mode
    
    if chat_mode == "dr":
        await arun_deepresearch_agent(session_service, sandbox)
    elif chat_mode == "browser":
        await arun_browseruse_agent(session_service, sandbox)
    elif chat_mode == "ds":
        await arun_datascience_agent(session_service, sandbox)
    elif chat_mode == "finance":
        await arun_finance_agent(session_service, sandbox)
    else:
        if chat_mode != "general":
            logger.warning(f"Unknown chat mode: {chat_mode}. Invoke general mode instead.")
        await arun_meta_planner(session_service, sandbox)

模式切换策略

  1. 显式切换: 用户在前端界面选择模式
  2. 自动识别: 系统根据任务特征自动推荐模式
  3. 动态调整: MetaPlanner 可以在执行过程中调用不同模式的 Worker

元规划器(Meta Planner)设计

核心概念

MetaPlanner 是 Alias 系统的核心协调组件,负责将复杂任务分解为可执行的子任务,并协调多个 Worker 完成整体任务。

架构组件

MetaPlanner 由三个核心组件构成:

MetaPlanner
    ├── PlannerNoteBook (规划笔记本)
    │   ├── Roadmap (路线图)
    │   ├── SubTaskStatus (子任务状态)
    │   └── WorkerInfo (Worker 信息)
    ├── RoadmapManager (路线图管理器)
    │   ├── 任务分解
    │   ├── 路线图创建
    │   └── 路线图修订
    └── WorkerManager (Worker 管理器)
        ├── Worker 创建
        ├── Worker 选择
        └── Worker 执行协调

PlannerNoteBook: 规划状态管理

PlannerNoteBook 是一个状态容器,用于存储整个规划过程的所有信息:

class PlannerNoteBook(StateModule):
    """规划笔记本,存储规划过程中的所有状态"""
    
    def __init__(self):
        self.roadmap: Roadmap = Roadmap()
        self.detail_analysis_for_plan: str = ""
        self.file_info: dict[str, FileInfo] = {}
        self.user_input: list[str] = []
        self.worker_pool: dict[str, tuple[WorkerInfo, ReActWorker]] = {}

Roadmap: 任务路线图

class Roadmap(BaseModel):
    """任务路线图"""
    original_task: str = ""                    # 原始任务
    decomposed_tasks: list[SubTaskStatus] = []  # 分解的子任务
    
    def next_unfinished_subtask(self) -> tuple[Optional[int], Optional[SubTaskStatus]]:
        """获取下一个未完成的子任务"""
        for idx, subtask in enumerate(self.decomposed_tasks):
            if subtask.status in ["todo", "in_progress"]:
                return idx, subtask
        return None, None

SubTaskSpecification: 子任务规范

class SubTaskSpecification(BaseModel):
    """子任务规范"""
    subtask_name: str                          # 子任务名称
    exact_input: str                          # 精确输入描述
    expected_output: str                      # 期望输出描述
    required_tools: list[str]                 # 需要的工具列表
    worker_type: Literal[                     # Worker 类型
        "general",
        "browser",
        "ds",
        "finance",
        "deep_research",
    ] = "general"
    worker_sys_prompt: str = ""               # Worker 系统提示词

RoadmapManager: 路线图管理

RoadmapManager 负责任务分解和路线图的生命周期管理:

核心方法

  1. decomposetaskandbuildroadmap: 任务分解和路线图创建
async def decompose_task_and_build_roadmap(
    self,
    user_latest_input: str,
    given_task_conclusion: str,
    detail_analysis_for_plan: str,
    decomposed_subtasks: list[SubTaskSpecification],
) -> ToolResponse:
    """
    1) 分析用户任务
    2) 推理完成整个任务所需的步骤
    3) 将这些步骤分组为几个可管理的子任务,满足:
       - 同一子任务的步骤使用相同的工具集
       - 同一子任务的步骤不依赖于后续子任务/步骤
       - 每个子任务的目标应该清晰且可验证
       - 同一目标的推理/分析和生成/动作应该在同一个子任务中
    """
    self.planner_notebook.detail_analysis_for_plan = detail_analysis_for_plan
    self.planner_notebook.roadmap.original_task = given_task_conclusion
    
    for subtask in decomposed_subtasks:
        subtask_status = SubTaskStatus(subtask_specification=subtask)
        self.planner_notebook.roadmap.decomposed_tasks.append(subtask_status)
    
    return ToolResponse(
        metadata={"success": True},
        content=[TextBlock(text="Successfully decomposed the task into subtasks.")],
    )
  1. reviseroadmap: 路线图修订
async def revise_roadmap(
    self,
    action: Literal["add_subtask", "revise_subtask", "remove_subtask"],
    subtask_idx: int,
    subtask_specification: Optional[SubTaskSpecification] = None,
    update_to_subtask: Optional[Update] = None,
    new_state: Literal["todo", "in_progress", "done", "abandoned"] = "in_progress",
) -> ToolResponse:
    """根据 Worker 执行报告修订路线图"""
    # 根据不同的 action 执行相应的修订操作

WorkerManager: Worker 管理与协调

WorkerManager 负责 Worker 的创建、选择和执行协调:

核心职责

  1. 动态 Worker 创建: 根据任务需求创建不同类型的 Worker
  2. Worker 选择: 为子任务选择最合适的 Worker
  3. 执行协调: 协调 Worker 的执行顺序和依赖关系
  4. 结果处理: 处理 Worker 的执行结果并更新路线图

Worker 创建流程

class WorkerManager(StateModule):
    def __init__(
        self,
        worker_model: ChatModelBase,
        worker_formatter: FormatterBase,
        planner_notebook: PlannerNoteBook,
        worker_full_toolkit: AliasToolkit,
        agent_working_dir: str,
        sandbox: AliasSandbox,
        worker_pool: Optional[dict[str, tuple[WorkerInfo, ReActWorker]]] = None,
        session_service: Any = None,
        long_term_memory: Optional[LongTermMemoryBase] = None,
    ):
        self.worker_model = worker_model
        self.worker_formatter = worker_formatter
        self.planner_notebook = planner_notebook
        self.worker_full_toolkit = worker_full_toolkit
        self.agent_working_dir = agent_working_dir
        self.sandbox = sandbox
        self.worker_pool = worker_pool or {}
        self.session_service = session_service
        self.long_term_memory = long_term_memory

Worker 执行流程

1. 获取下一个未完成的子任务
2. 根据子任务规范选择或创建 Worker
3. 配置 Worker 的工具集
4. 执行 Worker
5. 处理 Worker 响应
6. 更新路线图状态
7. 重复直到所有子任务完成

MetaPlanner 执行流程

用户输入
    ↓
[澄清阶段] (可选)
    ↓
[任务分解] → 创建路线图
    ↓
[用户确认]
    ↓
[子任务执行循环]
    ↓
获取下一个子任务 → 选择/创建 Worker → 执行 Worker → 处理结果 → 更新路线图
    ↓
[所有子任务完成]
    ↓
生成最终报告

深度研究(Deep Research)架构

核心概念

DeepResearchAgent 是专门用于深度研究和信息收集的智能体,采用树状结构组织研究任务,支持递归分解和假设驱动的研究方法。

架构设计

DeepResearchAgent
    ├── DeepResearchTreeNode (研究树节点)
    │   ├── DRTaskBase (研究任务基类)
    │   │   ├── BasicTask (基础任务)
    │   │   └── HypothesisDrivenTask (假设驱动任务)
    │   ├── Worker (执行节点任务的 Worker)
    │   └── ChildrenNodes (子节点)
    ├── DeepResearchTree (研究树)
    └── ReportGenerator (报告生成器)

DeepResearchTreeNode: 研究树节点

每个节点代表一个研究任务,可以递归分解为子任务:

class DeepResearchTreeNode(StateModule):
    def __init__(
        self,
        task_type: Literal["general", "finance"],
        current_executable: Optional[DRTaskBase] = None,
        level: int = 0,
        max_depth: int = 1,
        worker_builder_type: str = "default",
        parent_executable: Optional["DRTaskBase"] = None,
        report_dir: str = "/workspace",
        pre_execute_hook: Union[Callable, Coroutine, None] = None,
    ):
        self.task_type = task_type
        self.level = level
        self.worker_builder = get_deep_research_worker_builder(
            worker_builder_type if task_type == "general" else task_type,
        )
        self.worker = None
        self.current_executable: DRTaskBase = current_executable
        self.max_depth = max_depth
        self.parent_executable: DRTaskBase = parent_executable
        self.children_nodes: list[DeepResearchTreeNode] = []
        self.node_execution_result = {}
        self.report_dir = report_dir
        self.node_report_path = ""
        self.node_report = ""
        self.pre_execute_hook = pre_execute_hook

研究树遍历策略

def _get_next_executables(self) -> list[DeepResearchTreeNode]:
    """获取下一个可执行的节点"""
    if self.deep_research_tree is None:
        return []
    
    ready_nodes: list[DeepResearchTreeNode] = []
    stack: list[DeepResearchTreeNode] = [self.deep_research_tree]
    parent_ready_states: set[str] = {"done", "abandoned"}
    
    while stack:
        node = stack.pop()
        
        parent_is_ready = (
            node.parent_executable is None
            or node.parent_executable.state in parent_ready_states
        )
        
        if (
            node.current_executable.state in ["todo", "in_progress"]
            and parent_is_ready
            and node.level < self.max_depth
        ):
            ready_nodes.append(node)
        
        stack.extend(reversed(node.children_nodes))
    
    return ready_nodes

假设驱动研究

对于金融等需要假设验证的领域,DeepResearchAgent 支持假设驱动的研究方法:

async def _generate_hypothesis(self, node: DeepResearchTreeNode):
    """为 HypothesisDrivenTask 生成初始假设"""
    
    sys_prompt = PROMPT_INITIALIZE_HYPOTHESES.format(
        current_date=_get_timestamp(),
    )
    
    instruction_msg = Msg("system", content=[TextBlock(type="text", text=sys_prompt)], role="system")
    user_msg = Msg(
        "user",
        content=[
            TextBlock(
                type="text",
                text=f"Research Question: {node.current_executable.description}\n\nGenerate 2-4 key hypotheses.",
            ),
        ],
        role="user",
    )
    
    class HypothesesSchema(BaseModel):
        hypotheses: list[str] = Field(description="List of 2-4 testable hypotheses")
    
    try:
        prompt = await self.formatter.format([instruction_msg, user_msg])
        res = await self.model(prompt, structured_model=HypothesesSchema)
        
        hypotheses = None
        if self.model.stream:
            async for content_chunk in res:
                if content_chunk.metadata and "hypotheses" in content_chunk.metadata:
                    hypotheses = content_chunk.metadata["hypotheses"]
        else:
            if res.metadata and "hypotheses" in res.metadata:
                hypotheses = res.metadata["hypotheses"]
        
        if hypotheses:
            for hypothesis in hypotheses:
                hypothesis_task = HypothesisDrivenTask(
                    description=f"Investigate hypothesis:{hypothesis}",
                    evidences=[],
                    parent_executable=node,
                    max_depth=node.max_depth,
                    deep_research_worker_builder=node.worker_builder,
                    level=node.level + 1,
                )
                node.children_nodes.append(
                    DeepResearchTreeNode(
                        task_type="finance",
                        current_executable=hypothesis_task,
                        level=node.level + 1,
                        parent_executable=None,
                        max_depth=self.max_depth,
                        report_dir=self.agent_working_dir,
                        pre_execute_hook=None,
                    ),
                )
            node.current_executable.state = "done"
    except Exception as e:
        logger.warning(f"Failed to generate hypotheses: {e}")

深度研究执行流程

用户查询
    ↓
[初步信息收集] (可选)
    ↓
[澄清阶段] (可选)
    ↓
创建根节点
    ↓
[研究树遍历]
    ↓
获取可执行节点
    ↓
执行节点任务 (使用 Worker)
    ↓
生成节点报告
    ↓
[是否需要分解?]
    ├── 是 → 生成子节点 → 返回遍历
    └── 否 → 继续遍历
    ↓
[所有节点完成]
    ↓
生成最终报告

数据科学智能体设计

核心概念

DataScienceAgent 是专门用于数据分析、可视化和建模的智能体,集成了 Jupyter/IPython 环境和专业的数据科学工具链。

架构特点

  1. 场景感知: 根据任务类型自动选择合适的提示词和工具
  2. 任务管理: 内置 TodoList 管理数据分析任务
  3. 代码执行: 集成 IPython 环境进行代码执行
  4. 报告生成: 自动生成详细的数据分析报告

核心组件

class DataScienceAgent(AliasAgentBase):
    def __init__(
        self,
        name: str,
        model: ChatModelBase,
        formatter: FormatterBase,
        memory: MemoryBase,
        toolkit: AliasToolkit,
        sys_prompt: str = None,
        max_iters: int = 30,
        tmp_file_storage_dir: str = "/workspace",
        state_saving_dir: Optional[str] = None,
        session_service: Any = None,
    ) -> None:
        self.think_function_name = "think"
        self.uploaded_files: List[str] = []
        self.todo_list: List[Dict[str, Any]] = []
        self.infer_trajectories: List[List[Msg]] = []
        self.detailed_report_path = os.path.join(tmp_file_storage_dir, "detailed_report.html")
        self.tmp_file_storage_dir = tmp_file_storage_dir

场景感知系统

DataScienceAgent 使用 LLMPromptSelector 根据任务类型选择合适的提示词:

available_prompts = {
    "explorative_data_analysis": get_prompt_from_file("_scenario_explorative_data_analysis.md"),
    "data_modeling": get_prompt_from_file("_scenario_data_modeling_prompt.md"),
    "data_computation": get_prompt_from_file("_scenario_data_computation_prompt.md"),
}

self.prompt_selector = LLMPromptSelector(
    self.model,
    self.formatter,
    available_prompts,
)

TodoList 管理

内置 TodoList 用于跟踪数据分析任务:

def todo_write(
    agent: DataScienceAgent,
    action: Literal["add", "update", "delete", "mark_done"],
    task_name: str,
    task_description: str = "",
    task_status: Literal["todo", "in_progress", "done"] = "todo",
) -> ToolResponse:
    """管理数据分析任务的 TodoList"""
    if action == "add":
        agent.todo_list.append({
            "task_name": task_name,
            "task_description": task_description,
            "task_status": task_status,
        })
    elif action == "update":
        for task in agent.todo_list:
            if task["task_name"] == task_name:
                task["task_description"] = task_description
                task["task_status"] = task_status
    elif action == "delete":
        agent.todo_list = [t for t in agent.todo_list if t["task_name"] != task_name]
    elif action == "mark_done":
        for task in agent.todo_list:
            if task["task_name"] == task_name:
                task["task_status"] = "done"
    
    return ToolResponse(
        metadata={"success": True},
        content=[TextBlock(text=f"TodoList updated: {json.dumps(agent.todo_list, indent=2)}")],
    )

IPython 环境集成

def install_package(sandbox: AliasSandbox):
    """安装数据科学所需的 Python 包"""
    packages = [
        "pandas",
        "numpy",
        "matplotlib",
        "seaborn",
        "scikit-learn",
        "plotly",
    ]
    for package in packages:
        try:
            sandbox.call_tool("pip_install", {"package": package})
        except Exception as e:
            logger.warning(f"Failed to install {package}: {e}")

def set_run_ipython_cell(sandbox: AliasSandbox):
    """配置 IPython 单元格执行"""
    sandbox.call_tool("ipython_setup", {})

动态系统提示词

DataScienceAgent 的系统提示词是动态构建的,包含基础提示词、场景提示词和 TodoList 提示词:

@property
def sys_prompt(self) -> str:
    base_prompt = self._sys_prompt
    
    todo_prompt = self.todo_list_prompt.replace(
        "{todoList}",
        json.dumps(self.todo_list, indent=2, ensure_ascii=False),
    )
    
    return f"{base_prompt}{self._selected_scenario_prompts}\n\n{todo_prompt}"

数据科学执行流程

用户上传数据文件
    ↓
[场景识别] → 选择合适的提示词
    ↓
[探索性数据分析]
    ├── 数据加载
    ├── 数据清洗
    ├── 统计分析
    └── 可视化
    ↓
[数据建模] (可选)
    ├── 特征工程
    ├── 模型训练
    └── 模型评估
    ↓
[数据计算] (可选)
    ├── 数据转换
    ├── 聚合计算
    └── 高级分析
    ↓
生成详细报告

工具系统架构

AliasToolkit: 核心工具包

AliasToolkit 是 Alias 系统的工具管理核心,继承自 AgentScope 的 Toolkit,提供了增强的工具管理能力。

核心功能

  1. 工具注册: 支持函数和 JSON Schema 两种注册方式
  2. 工具分组: 通过 groupname 组织工具
  3. 工具共享: 在不同智能体间共享工具
  4. 工具黑名单: 过滤不需要的工具
  5. 后处理钩子: 对工具输出进行后处理
  6. MCP 集成: 支持 Model Context Protocol 客户端

初始化

class AliasToolkit(Toolkit):
    def __init__(
        self,
        sandbox: AliasSandbox = None,
        add_all: bool = False,
        is_browser_toolkit: bool = False,
        tool_blacklist: list = TOOL_BLACKLIST,
    ):
        super().__init__()
        self.sandbox = sandbox
        self.session_id = self.sandbox.sandbox_id if sandbox else None
        self.categorized_functions = {}
        self.tool_blacklist = tool_blacklist
        
        if add_all and sandbox:
            tools_schema = self.sandbox.list_tools()
            for category, function_dicts in tools_schema.items():
                if (is_browser_toolkit and category == "playwright") or (
                    not is_browser_toolkit and category != "playwright"
                ):
                    for _, function_json in function_dicts.items():
                        if function_json["name"] not in self.tool_blacklist:
                            self._add_io_function(function_json)
            
            # 添加改进的文件操作工具
            file_sys = ImprovedFileOperations(sandbox)
            self.register_tool_function(file_sys.read_file)
        
        self.additional_mcp_clients = []
        self.long_text_post_hook = LongTextPostHook(sandbox)
        self._add_tool_postprocessing_func()

工具注册

def _add_io_function(
    self,
    json_schema: dict,
    is_browser_tool: bool = False,
) -> None:
    """添加 IO 函数"""
    tool_name = json_schema["name"]
    
    def wrap_tool_func(name: str) -> Callable:
        def wrapper(**kwargs) -> ToolResponse:
            try:
                result = self.sandbox.call_tool(name=name, arguments=kwargs)
                
                if isinstance(result, dict) and "content" in result:
                    content = result["content"]
                    if isinstance(content, list):
                        for i, block in enumerate(content):
                            if isinstance(block, dict) and "annotations" in block:
                                block.pop("annotations")
                                content[i] = block
                else:
                    content = [TextBlock(type="text", text=str(result))]
                
                return ToolResponse(
                    metadata={"success": True, "tool_name": name},
                    content=content,
                )
            except Exception as e:
                logger.error(f"Error executing tool {name}: {str(e)}")
                return ToolResponse(
                    metadata={"success": False, "tool_name": name, "error": str(e)},
                    content=[TextBlock(type="text", text=f"Error executing tool {name}: {str(e)}")],
                )
        
        wrapper.__name__ = name
        return wrapper
    
    tool_func = wrap_tool_func(tool_name)
    self.register_tool_function(
        tool_func=tool_func,
        json_schema=json_schema.get("json_schema", {}),
    )

工具共享

def share_tools(
    source_toolkit: AliasToolkit,
    target_toolkit: AliasToolkit,
    tool_names: list[str],
) -> None:
    """在工具包之间共享工具"""
    for tool_name in tool_names:
        if tool_name in source_toolkit.tools:
            target_toolkit.tools[tool_name] = source_toolkit.tools[tool_name]

工具后处理钩子

def _add_tool_postprocessing_func(self) -> None:
    """添加工具后处理函数"""
    long_text_hook = LongTextPostHook(self.sandbox)
    
    for tool_func, _ in self.tools.items():
        if tool_func.startswith(("read_file", "read_multiple_files")):
            self.tools[tool_func].postprocess_func = read_file_post_hook
        if tool_func.startswith("tavily"):
            self.tools[tool_func].postprocess_func = long_text_hook.truncate_and_save_response

MCP 客户端集成

async def add_and_connect_mcp_client(
    self,
    mcp_client: MCPClientBase,
    group_name: str = "basic",
    enable_funcs: list[str] | None = None,
    disable_funcs: list[str] | None = None,
    preset_kwargs_mapping: dict[str, dict[str, Any]] | None = None,
    postprocess_func: Callable[[ToolUseBlock, ToolResponse], ToolResponse | None] | None = None,
):
    """添加并连接 MCP 客户端"""
    if isinstance(mcp_client, StatefulClientBase):
        await mcp_client.connect()
        self.additional_mcp_clients.append(mcp_client)
        await self.register_mcp_client(
            mcp_client,
            enable_funcs=enable_funcs,
            group_name=group_name,
            disable_funcs=disable_funcs,
            preset_kwargs_mapping=preset_kwargs_mapping,
            postprocess_func=postprocess_func,
        )
    elif isinstance(mcp_client, HttpStatelessClient):
        self.additional_mcp_clients.append(mcp_client)
        await self.register_mcp_client(
            mcp_client,
            enable_funcs=enable_funcs,
            group_name=group_name,
            disable_funcs=disable_funcs,
            preset_kwargs_mapping=preset_kwargs_mapping,
            postprocess_func=postprocess_func,
        )

工具分类

AliasToolkit 支持按类别组织工具:

工具分类
├── basic: 基础工具 (文件操作、系统命令等)
├── browser: 浏览器工具 (Playwright)
├── data_science: 数据科学工具 (pandas, numpy, matplotlib)
├── finance: 金融工具 (股票查询、财务分析)
├── deep_research: 深度研究工具 (搜索、爬虫)
└── custom: 自定义工具

记忆系统设计

记忆系统架构

Alias 的记忆系统采用分层设计,结合短期记忆和长期记忆:

记忆系统
├── 短期记忆 (InMemoryMemory)
│   ├── 会话上下文
│   ├── 工具调用历史
│   └── 推理轨迹
└── 长期记忆 (LongTermMemoryBase)
    ├── 工具记忆 (Tool Memory)
    └── 用户画像 (User Profiling)

长期记忆模式

AliasAgentBase 支持三种长期记忆模式:

long_term_memory_mode: Literal["agent_control", "static_control", "both"] = "both"
  1. agentcontrol: 智能体自主决定何时访问长期记忆
  2. staticcontrol: 在特定阶段静态访问长期记忆
  3. both: 结合两种模式

工具记忆

工具记忆记录工具的使用历史和效果,用于优化工具选择:

class ToolMemory:
    """工具记忆"""
    
    def __init__(self):
        self.tool_usage_history: dict[str, list[ToolUsageRecord]] = {}
        self.tool_success_rate: dict[str, float] = {}
    
    def record_tool_usage(
        self,
        tool_name: str,
        arguments: dict,
        result: ToolResponse,
        context: str,
    ):
        """记录工具使用"""
        if tool_name not in self.tool_usage_history:
            self.tool_usage_history[tool_name] = []
        
        record = ToolUsageRecord(
            tool_name=tool_name,
            arguments=arguments,
            success=result.metadata.get("success", False),
            timestamp=datetime.now(),
            context=context,
        )
        
        self.tool_usage_history[tool_name].append(record)
        self._update_success_rate(tool_name)
    
    def _update_success_rate(self, tool_name: str):
        """更新工具成功率"""
        records = self.tool_usage_history.get(tool_name, [])
        if records:
            success_count = sum(1 for r in records if r.success)
            self.tool_success_rate[tool_name] = success_count / len(records)

用户画像

用户画像记录用户的偏好、历史行为和特征:

class UserProfile:
    """用户画像"""
    
    def __init__(self):
        self.user_id: str = ""
        self.preferences: dict[str, Any] = {}
        self.task_history: list[TaskRecord] = []
        self.frequently_used_tools: list[str] = []
        self.communication_style: str = "formal"
    
    def update_preferences(self, new_preferences: dict[str, Any]):
        """更新用户偏好"""
        self.preferences.update(new_preferences)
    
    def record_task(self, task: TaskRecord):
        """记录任务"""
        self.task_history.append(task)
        self._update_frequently_used_tools(task.tools_used)
    
    def _update_frequently_used_tools(self, tools: list[str]):
        """更新常用工具"""
        from collections import Counter
        all_tools = [t for task in self.task_history for t in task.tools_used]
        counter = Counter(all_tools)
        self.frequently_used_tools = [tool for tool, _ in counter.most_common(10)]

记忆访问 Hook

通过 Hook 机制在智能体执行的不同阶段访问记忆:

def get_user_input_to_mem_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg:
    """将用户输入添加到记忆"""
    if msg and msg.role == "user":
        asyncio.create_task(agent.memory.add(msg))
    return msg

def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg:
    """保存推理状态"""
    if agent.state_saving_dir:
        state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json")
        with open(state_path, "w") as f:
            json.dump({
                "timestamp": datetime.now().isoformat(),
                "reasoning": reasoning_msg.model_dump(),
            }, f)
    return reasoning_msg

def save_post_action_state(agent: AliasAgentBase, action_msg: Msg) -> Msg:
    """保存动作状态"""
    if agent.state_saving_dir:
        state_path = os.path.join(agent.state_saving_dir, "action_state.json")
        with open(state_path, "w") as f:
            json.dump({
                "timestamp": datetime.now().isoformat(),
                "action": action_msg.model_dump(),
            }, f)
    return action_msg

前后端架构与通信机制

后端架构 (FastAPI)

后端基于 FastAPI 构建,提供 RESTful API 和 WebSocket 支持:

def create_app():
    application = FastAPI(
        generate_unique_id_function=custom_generate_unique_id,
        title=settings.PROJECT_NAME,
        openapi_url=f"{settings.API_V1_STR}/openapi.json",
        lifespan=lifespan,
    )
    
    # 中间件
    application.add_middleware(CORSMiddleware, allow_origins=["*"])
    application.add_middleware(RequestContextMiddleware)
    application.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY)
    
    # 异常处理
    application.add_exception_handler(BaseError, base_exception_handler)
    
    # 路由
    application.include_router(api_router)
    
    return application

生命周期管理

@asynccontextmanager
async def lifespan(_app: FastAPI):
    """应用生命周期管理"""
    # 启动
    print("🚀 Starting Alias API Server...")
    setup_logger()
    await initialize_database()
    await task_manager.start()
    await redis_client.ping()
    
    try:
        await FastAPILimiter.init(redis_client)
    except Exception as e:
        print(f"redis init error: {str(e)}")
    
    yield
    
    # 关闭
    await task_manager.stop()
    await close_database()

前端架构 (React + TypeScript)

前端基于 React 和 TypeScript 构建,使用 Ant Design 作为 UI 组件库:

{
  "name": "alias_frontend",
  "dependencies": {
    "react": "^18.2.0",
    "react-dom": "^18.2.0",
    "antd": "^5.24.8",
    "@agentscope-ai/chat": "1.1.17",
    "@agentscope-ai/design": "1.0.11",
    "axios": "^1.8.4",
    "socket.io-client": "^4.8.1",
    "react-router-dom": "^6.22.3",
    "react-markdown": "^10.1.0",
    "@codemirror/lang-python": "^6.1.7",
    "@xterm/xterm": "^5.5.0"
  }
}

通信机制

REST API

POST /api/v1/chat
POST /api/v1/session/create
POST /api/v1/session/{session_id}/message
GET  /api/v1/session/{session_id}/history
POST /api/v1/agent/execute
GET  /api/v1/agent/status

WebSocket

用于实时通信和流式响应:

const socket = io('http://localhost:8000', {
  transports: ['websocket'],
});

socket.on('connect', () => {
  console.log('Connected to server');
});

socket.on('message', (data) => {
  console.log('Received message:', data);
});

socket.on('agent_response', (data) => {
  console.log('Agent response:', data);
});

会话管理

SessionService 负责会话状态管理:

class SessionService:
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.session_entity = SessionEntity(
            session_id=session_id,
            chat_mode="general",
            created_at=datetime.now(),
        )
        self.message_history: list[Msg] = []
        self.agent_state: dict[str, Any] = {}
    
    async def add_message(self, msg: Msg):
        """添加消息到历史"""
        self.message_history.append(msg)
    
    async def get_history(self) -> list[Msg]:
        """获取消息历史"""
        return self.message_history
    
    async def save_state(self, state: dict[str, Any]):
        """保存智能体状态"""
        self.agent_state = state

沙箱执行环境

AliasSandbox: 沙箱核心

AliasSandbox 提供安全的代码执行和工具调用环境:

class AliasSandbox:
    def __init__(self, sandbox_id: str):
        self.sandbox_id = sandbox_id
        self.workspace = f"/workspace/{sandbox_id}"
        self._initialize_environment()
    
    def _initialize_environment(self):
        """初始化沙箱环境"""
        os.makedirs(self.workspace, exist_ok=True)
    
    def list_tools(self) -> dict[str, dict]:
        """列出可用工具"""
        return {
            "file_system": {...},
            "code_execution": {...},
            "browser": {...},
            "data_science": {...},
        }
    
    def call_tool(self, name: str, arguments: dict) -> ToolResponse:
        """调用工具"""
        try:
            tool_func = self._get_tool_function(name)
            result = tool_func(**arguments)
            return ToolResponse(
                metadata={"success": True, "tool_name": name},
                content=[TextBlock(type="text", text=str(result))],
            )
        except Exception as e:
            return ToolResponse(
                metadata={"success": False, "tool_name": name, "error": str(e)},
                content=[TextBlock(type="text", text=f"Error: {str(e)}")],
            )

安全特性

  1. 隔离执行: 每个会话使用独立的沙箱环境
  2. 资源限制: 限制 CPU、内存和磁盘使用
  3. 网络隔离: 控制网络访问权限
  4. 文件系统隔离: 独立的工作目录
  5. 超时控制: 工具调用超时机制

改进的文件操作

class ImprovedFileOperations:
    """改进的文件操作工具"""
    
    def __init__(self, sandbox: AliasSandbox):
        self.sandbox = sandbox
    
    def read_file(self, path: str) -> ToolResponse:
        """读取文件"""
        try:
            full_path = os.path.join(self.sandbox.workspace, path)
            with open(full_path, "r", encoding="utf-8") as f:
                content = f.read()
            return ToolResponse(
                metadata={"success": True},
                content=[TextBlock(type="text", text=content)],
            )
        except Exception as e:
            return ToolResponse(
                metadata={"success": False, "error": str(e)},
                content=[TextBlock(type="text", text=f"Error reading file: {str(e)}")],
            )
    
    def write_file(self, path: str, content: str) -> ToolResponse:
        """写入文件"""
        try:
            full_path = os.path.join(self.sandbox.workspace, path)
            os.makedirs(os.path.dirname(full_path), exist_ok=True)
            with open(full_path, "w", encoding="utf-8") as f:
                f.write(content)
            return ToolResponse(
                metadata={"success": True},
                content=[TextBlock(type="text", text=f"File written successfully")],
            )
        except Exception as e:
            return ToolResponse(
                metadata={"success": False, "error": str(e)},
                content=[TextBlock(type="text", text=f"Error writing file: {str(e)}")],
            )

状态管理与持久化

StateModule: 状态模块基类

StateModule 提供状态持久化的基础设施:

class StateModule:
    def __init__(self):
        self._registered_states: dict[str, StateConfig] = {}
    
    def register_state(
        self,
        name: str,
        custom_to_json: Optional[Callable] = None,
        custom_from_json: Optional[Callable] = None,
    ):
        """注册状态"""
        self._registered_states[name] = StateConfig(
            name=name,
            custom_to_json=custom_to_json,
            custom_from_json=custom_from_json,
        )
    
    def state_dict(self) -> dict[str, Any]:
        """获取状态字典"""
        state = {}
        for name, config in self._registered_states.items():
            value = getattr(self, name)
            if config.custom_to_json:
                state[name] = config.custom_to_json(value)
            else:
                state[name] = value
        return state
    
    def load_state_dict(self, state_dict: dict[str, Any]):
        """从状态字典加载"""
        for name, config in self._registered_states.items():
            if name in state_dict:
                if config.custom_from_json:
                    setattr(self, name, config.custom_from_json(state_dict[name]))
                else:
                    setattr(self, name, state_dict[name])

状态保存与恢复

async def save_agent_state(agent: AliasAgentBase, session_id: str):
    """保存智能体状态"""
    if agent.state_saving_dir:
        state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json")
        with open(state_path, "w") as f:
            json.dump(agent.state_dict(), f)

async def load_agent_state(agent: AliasAgentBase, session_id: str):
    """加载智能体状态"""
    if agent.state_saving_dir:
        state_path = os.path.join(agent.state_saving_dir, f"{session_id}_state.json")
        if os.path.exists(state_path):
            with open(state_path, "r") as f:
                state_dict = json.load(f)
            agent.load_state_dict(state_dict)

扩展点与Hook机制

Hook 类型

AliasAgentBase 提供多个 Hook 扩展点:

# Hook 扩展点
- pre_reply: 回复前
- post_reasoning: 推理后
- pre_acting: 动作执行前
- post_acting: 动作执行后

Hook 注册

self.register_instance_hook(
    "pre_reply",
    "agent_load_states_pre_reply_hook",
    agent_load_states_pre_reply_hook,
)

Hook 实现

async def agent_load_states_pre_reply_hook(agent: AliasAgentBase, msg: Msg) -> Msg:
    """加载智能体状态"""
    if agent.state_saving_dir and agent.session_service:
        await load_agent_state(agent, agent.session_service.session_id)
    return msg

async def save_post_reasoning_state(agent: AliasAgentBase, reasoning_msg: Msg) -> Msg:
    """保存推理状态"""
    if agent.state_saving_dir:
        state_path = os.path.join(agent.state_saving_dir, "reasoning_state.json")
        with open(state_path, "w") as f:
            json.dump({
                "timestamp": datetime.now().isoformat(),
                "reasoning": reasoning_msg.model_dump(),
            }, f)
    return reasoning_msg

设计模式与最佳实践

设计模式总结

  1. 策略模式: 不同智能体模式采用不同的执行策略
  2. 观察者模式: Hook 机制实现事件驱动的扩展点
  3. 工厂模式: WorkerManager 动态创建不同类型的 Worker
  4. 模板方法模式: AliasAgentBase 定义执行流程模板
  5. 建造者模式: 深度研究树节点的构建过程
  6. 责任链模式: 工具调用的后处理钩子链
  7. 单例模式: SessionService 和某些全局服务
  8. 装饰器模式: Hook 机制本质上是一种装饰器模式

最佳实践

  1. 模块化设计: 每个模块职责单一,高内聚低耦合
  2. 接口抽象: 依赖抽象接口而非具体实现
  3. 配置驱动: 通过配置文件和环境变量控制行为
  4. 错误处理: 完善的异常处理和错误恢复机制
  5. 日志记录: 使用 loguru 进行结构化日志记录
  6. 类型注解: 使用 Python 类型注解提高代码可读性
  7. 文档字符串: 完善的文档字符串和注释
  8. 测试覆盖: 单元测试和集成测试

技术栈与依赖分析

后端技术栈

# 核心框架
- agentscope: 智能体框架
- fastapi: Web 框架
- uvicorn: ASGI 服务器
- pydantic: 数据验证

# 数据库
- sqlalchemy: ORM
- redis: 缓存和会话存储

# 工具和库
- loguru: 日志记录
- tenacity: 重试机制
- playwright: 浏览器自动化
- pandas: 数据处理
- numpy: 数值计算
- matplotlib: 数据可视化

前端技术栈

{
  "核心框架": ["react", "react-dom", "typescript"],
  "UI组件库": ["antd", "@agentscope-ai/design"],
  "状态管理": ["@agentscope-ai/chat"],
  "路由": ["react-router-dom"],
  "HTTP客户端": ["axios"],
  "实时通信": ["socket.io-client"],
  "代码编辑器": ["@codemirror/lang-python", "@uiw/react-codemirror"],
  "终端": ["@xterm/xterm"],
  "Markdown渲染": ["react-markdown", "remark-gfm"],
  "样式": ["tailwindcss", "less", "sass"]
}

依赖关系图

agentscope
    ├── ReActAgent
    │   └── AliasAgentBase
    │       ├── MetaPlanner
    │       ├── DeepResearchAgent
    │       ├── DataScienceAgent
    │       ├── BrowserUseAgent
    │       └── FinanceAgent
    ├── Toolkit
    │   └── AliasToolkit
    ├── MemoryBase
    │   ├── InMemoryMemory
    │   └── LongTermMemoryBase
    └── StateModule

fastapi
    ├── API Router
    ├── Middleware
    └── Exception Handlers

react
    ├── Components
    ├── Hooks
    └── Context

总结

Alias 项目是一个设计精良、架构清晰的多模式智能体系统。通过模块化设计、清晰的分层架构和丰富的扩展点,系统具有高度的可扩展性和可维护性。

核心优势

  1. 模块化架构: 清晰的模块划分,职责明确
  2. 多模式支持: 支持多种专业化智能体模式
  3. 元规划能力: 强大的任务分解和协调能力
  4. 工具生态系统: 丰富的工具和灵活的工具管理
  5. 记忆系统: 完善的短期和长期记忆机制
  6. 状态持久化: 支持智能体状态的保存和恢复
  7. Hook 机制: 灵活的扩展点,支持自定义行为
  8. 前后端分离: 清晰的前后端架构,易于扩展

设计亮点

  1. MetaPlanner 的任务分解和协调机制
  2. DeepResearchAgent 的树状研究结构
  3. DataScienceAgent 的场景感知系统
  4. AliasToolkit 的工具管理和共享机制
  5. Hook 机制的灵活扩展能力
  6. StateModule 的状态持久化机制

未来发展方向

  1. 更多智能体模式: 支持更多专业化场景
  2. 更强大的工具生态: 集成更多第三方工具
  3. 更智能的记忆系统: 基于向量数据库的语义记忆
  4. 更高效的执行引擎: 优化任务调度和资源管理
  5. 更丰富的可视化: 增强任务执行过程的可视化
  6. 更完善的测试: 提高测试覆盖率和质量

报告生成时间: 2025-12-26
项目版本: 基于 alias 目录最新代码

讨论回复

0 条回复

还没有人回复