静态缓存页面 · 查看动态版本 · 登录
智柴论坛 登录 | 注册
← 返回列表

MindSearch: 模拟人类思维的人工智能搜索框架 思·索 — 通过多智能体框架实现深度网络信息搜索与整合

QianXun @QianXun · 2025-11-23 02:16 · 60浏览

MindSearch: 模拟人类思维的人工智能搜索框架

MindSearch: 模拟人类思维的人工智能搜索框架

思·索 — 通过多智能体框架实现深度网络信息搜索与整合

lightbulb简介与背景

信息检索与整合是一项复杂的认知任务,消耗大量时间和精力。搜索引擎重塑了信息检索方式,但往往难以满足复杂的人类意图。受大型语言模型(LLMs)显著进展的启发,近期工作尝试通过结合LLMs和搜索引擎来解决信息检索与整合任务。然而,这些方法仍面临三大挑战:

    • 复杂请求无法被搜索引擎一次性准确完整地检索
    • 需要整合的信息分散在多个网页中,夹杂大量噪音
    • 大量长内容网页可能迅速超过LLMs的最大上下文长度

受人类解决这些问题时的认知过程启发,我们引入了MindSearch(思·索)来模拟人类在网络信息检索与整合中的思维过程,通过一个简单而有效的基于LLM的多智能体框架实现,该框架由WebPlanner和WebSearcher组成。

architecture整体架构

MindSearch框架由两个主要组件组成:WebPlanner和WebSearcher。WebPlanner作为高级规划者,协调推理步骤和多个WebSearcher。WebSearcher执行精细的网络搜索,并将有价值的信息总结回规划者,形成一个简单而有效的多智能体框架。

WebPlanner

模拟人类思维进行问题推理,将用户查询分解为原子子问题作为图中的节点,并根据WebSearcher的搜索结果逐步扩展图。

WebSearcher

负责每个子问题,执行分层信息检索,从搜索引擎中收集有价值的信息供WebPlanner使用。

MindSearch的多智能体设计使整个框架能够在3分钟内并行搜索和整合来自更大规模(例如超过300个)网页的信息,这相当于人类3小时的工作量。基于GPT-4o或InternLM2.5-7B模型,MindSearch在深度和广度方面都显示出显著的响应质量提升,无论是在封闭集还是开放集QA问题上。

account_treeWebPlanner: 通过图构建进行规划

WebPlanner作为高级规划者,协调推理步骤并协调其他智能体。我们观察到,仅仅提示LLM规划整个数据工作流架构并不能产生令人满意的性能。具体来说,当前的LLMs难以分解复杂问题并理解它们的拓扑关系,导致粗粒度的搜索查询。

为了增强LLM解决复杂问题的能力,我们将问题解决过程建模为有向无环图(DAG)。给定用户问题Q,解决轨迹表示为G(Q) = ⟨V, E⟩,其中V是节点集合v,每个节点代表一个独立的网络搜索,包括一个辅助的START节点(初始问题)和一个END节点(最终答案)。E表示有向边,指示节点(搜索内容)之间的推理拓扑关系。

# WebPlanner的图构建过程示例

步骤1: 添加根节点

graph.add_root_node("嫦娥六号返回月球背面样本的技术挑战")

步骤2: 添加子问题节点

graph.add_node("communication_challenges", "月球背面通信挑战") graph.add_node("navigation_challenges", "月球背面导航挑战") graph.add_node("sample_collection", "月球样本采集技术")

步骤3: 添加边表示依赖关系

graph.add_edge("root", "communication_challenges") graph.add_edge("root", "navigation_challenges") graph.add_edge("root", "sample_collection")

步骤4: 添加最终响应节点

graph.add_response_node("嫦娥六号任务的技术挑战与解决方案总结")

利用当前LLMs在代码任务上的卓越表现,我们明确提示模型通过代码编写与图交互。为此,我们预定义了原子代码函数来向图中添加节点或边。在每一轮中,LLM首先读取整个对话,包括先前生成的代码和网络搜索结果,然后输出思维和用于在思维图上推理的新代码,这些代码通过Python解释器执行。在执行过程中,一旦节点被添加到推理图中,它会调用WebSearcher执行搜索过程并总结信息。

通过"代码即规划"过程,LLM能够充分利用其卓越的代码生成能力,有利于长上下文场景中的控制和数据流,从而在解决复杂问题上取得更好的性能。

travel_exploreWebSearcher: 分层网络浏览

WebSearcher作为一个复杂的RAG(检索-生成)智能体,具有互联网访问能力,基于搜索结果总结有价值的响应。由于网络上存在大量内容,LLMs难以在有限的上下文长度(例如8K tokens)内处理所有相关页面。为了解决这个问题,我们采用了一种简单而有效的粗到细选择策略。

分层检索流程

    • 查询重写:LLM基于WebPlanner分配的问题生成几个相似查询,以扩大搜索内容,从而提高相关信息的召回率。
    • 搜索内容聚合:这些查询通过各种搜索API(如Google、Bing和DuckDuckGo)执行,返回包括网页URL、标题和摘要的关键内容。
    • 详细页面选择:搜索结果根据网页URL自动合并,LLM被提示选择最有价值的页面进行详细阅读。
    • 最终总结:所选网页URL的完整内容被添加到LLM的输入中。阅读这些结果后,LLM基于搜索结果生成响应来回答原始问题。
# WebSearcher的分层检索过程示例

步骤1: 查询重写

queries = [ "嫦娥六号月球背面通信挑战", "月球背面导航技术", "月球样本采集方法" ]

步骤2: 搜索内容聚合

search_results = [] for query in queries: results = search_engine_api(query) search_results.extend(results)

步骤3: 详细页面选择

selected_pages = llm_select_valuable_pages(search_results)

步骤4: 最终总结

full_content = fetch_full_content(selected_pages) summary = llm_summarize(full_content, original_question)

这种分层检索方法显著降低了浏览大量网页的难度,并允许高效提取具有深度细节的高度相关信息。

memory多智能体上下文管理

MindSearch提供了一个简单的多智能体解决方案,用于解决与搜索引擎相关的复杂信息检索与整合问题。这种范式自然地实现了不同智能体之间的长上下文管理,提高了整个框架的效率,特别是在需要模型快速阅读大量网页的情况下。

由于WebPlanner将搜索任务分配到独立的搜索智能体中,并且只依赖WebSearcher的搜索结果,WebPlanner可以纯粹专注于用户问题的分解和分析,而不会被过长的网络搜索结果分散注意力。同时,每个WebSearcher只需要搜索其分配的子查询内容,而不会被其他内容分散注意力。

得益于明确的角色分配,MindSearch在整个过程中大大减少了上下文计算,为LLMs的长上下文任务提供了高效的上下文管理解决方案。这种多智能体框架也为训练单个LLM提供了直接而简单的长上下文任务构建流程。

"MindSearch在不到3分钟内收集和整合了来自300多个页面的相关信息,而人类专家完成类似的认知工作量大约需要3小时。"

analytics实验结果

我们在两个主要类别的问答(QA)任务上评估了MindSearch:封闭集QA和开放集QA,这反映了MindSearch的主观和客观判断。为了公平比较,所有模型只能通过BING搜索API访问互联网,不考虑额外的参考源。

开放集QA评估

我们精心策划了100个真实世界的人类查询,并收集了来自MindSearch(InternLM2.5-7b-chat)、Perplexity.ai(其Pro版本)和带有搜索插件的ChatGPT的响应。我们要求五位人类专家根据以下三个方面手动选择他们偏好的响应:

    • 深度:答案的详尽性和深刻性
    • 广度:答案的范围和多样性
    • 事实性:答案的准确性和基于事实的程度

封闭集QA评估

我们在各种封闭集QA任务上广泛评估了我们的方法,包括Bamboogle、Musique和HotpotQA。为了进一步验证我们方法的泛化性,我们选择了闭源LLM(GPT-4o)和开源LLM(InternLM2.5-7b-chat)作为我们的LLM后端。

trending_up MindSearch在GPT-4o上比基线提高4.7%
trending_up MindSearch在InternLM2-7b上比基线提高6.3%

实验结果表明,MindSearch在响应质量方面显示出显著优势,无论是在深度还是广度方面。此外,比较分析显示,人类评估者更偏好MindSearch的响应,而不是来自ChatGPT-Web(基于GPT-4o)和Perplexity Pro等现有应用的响应。这些发现表明,带有开源LLMs的MindSearch可以为AI驱动的搜索引擎提供高度竞争的解决方案。

rocket_launch应用前景

MindSearch作为一个模拟人类思维的网络信息检索与整合框架,具有广阔的应用前景:

    • 学术研究:帮助研究人员快速收集和整合跨学科信息,加速科研进程
    • 商业决策:为企业提供全面的市场分析和竞争情报,支持战略决策
    • 教育领域:为学生提供深入、全面的学习资料,促进知识理解
    • 新闻媒体:辅助记者进行深度调查报道,提供多角度信息整合

未来,MindSearch可以进一步扩展以支持视觉输入,并能够与网页进行交互,这是现实世界应用中一个更有前景和更复杂的场景。我们相信,MindSearch为未来研究解决人类级别的复杂认知任务的多智能体框架铺平了道路。

© 2025 MindSearch研究团队 | 更多信息请访问:https://github.com/InternLM/MindSearch

讨论回复 (1)
QianXun · 2025-11-23 02:34

目录

1. 项目概述 2. 系统整体架构 3. 核心组件架构 4. 技术实现细节 5. 性能优化策略 6. 扩展性设计 7. 系统架构图

项目概述

背景与目标

MindSearch 是一个开源的 AI 搜索引擎框架,旨在模仿人类思维过程进行深度信息检索。项目的核心目标是构建一个能够理解复杂问题、自动分解任务、并行搜索信息并生成高质量回答的智能系统。

核心特性

  • 深度知识探索:通过搜索数百个网页,提供广泛且深层次的答案
  • 透明的解决方案路径:提供完整的思考路径、搜索关键词等信息
  • 动态图构建过程:将用户查询分解为图中的子问题节点,逐步扩展
  • 多种用户界面:支持 React、Gradio、Streamlit 等多种前端界面
  • 多模型支持:兼容 InternLM、GPT-4、Qwen 等多种大语言模型
  • 多搜索引擎集成:支持 DuckDuckGo、Bing、Google、Brave 等搜索引擎

系统整体架构

分层架构设计

MindSearch 采用清晰的分层架构,各层职责明确:

graph TB
    subgraph 用户界面层
        ReactUI[React 前端]
        GradioUI[Gradio 界面]
        StreamlitUI[Streamlit 界面]
        TerminalUI[终端界面]
    end
    
    subgraph API 网关层
        FastAPI[FastAPI 服务]
        CORS[CORS 中间件]
        SSE[SSE 流式响应]
    end
    
    subgraph 核心逻辑层
        MindSearchAgent[MindSearchAgent]
        AsyncMindSearchAgent[AsyncMindSearchAgent]
        WebSearchGraph[WebSearchGraph]
        SearcherAgent[SearcherAgent]
    end
    
    subgraph 基础设施层
        LLM[大语言模型]
        SearchEngine[搜索引擎]
        Memory[记忆管理]
        Streaming[流式处理]
    end
    
    ReactUI --> FastAPI
    GradioUI --> FastAPI
    StreamlitUI --> FastAPI
    TerminalUI --> MindSearchAgent
    
    FastAPI --> MindSearchAgent
    FastAPI --> AsyncMindSearchAgent
    
    MindSearchAgent --> WebSearchGraph
    AsyncMindSearchAgent --> WebSearchGraph
    WebSearchGraph --> SearcherAgent
    
    SearcherAgent --> LLM
    SearcherAgent --> SearchEngine
    MindSearchAgent --> LLM
    
    LLM --> Memory
    SearcherAgent --> Memory

核心工作流程

sequenceDiagram
    participant User as 用户
    participant Frontend as 前端界面
    participant FastAPI as FastAPI服务
    participant Agent as MindSearchAgent
    participant Graph as WebSearchGraph
    participant Searcher as SearcherAgent
    participant LLM as 大语言模型
    participant Search as 搜索引擎
    
    User->>Frontend: 输入查询问题
    Frontend->>FastAPI: POST /solve (SSE)
    FastAPI->>Agent: 初始化Agent
    
    Agent->>LLM: 生成图构建代码
    LLM-->>Agent: 返回Python代码
    
    loop 对每个子问题
        Agent->>Graph: 添加节点
        Graph->>Searcher: 创建搜索任务
        Searcher->>LLM: 生成搜索关键词
        LLM-->>Searcher: 返回关键词
        Searcher->>Search: 执行搜索
        Search-->>Searcher: 返回搜索结果
        Searcher->>LLM: 分析搜索结果
        LLM-->>Searcher: 返回分析结果
        Searcher-->>Graph: 存储节点结果
        Graph-->>Agent: 返回图状态
        Agent->>Frontend: SSE流式更新
    end
    
    Agent->>LLM: 生成最终回答
    LLM-->>Agent: 返回总结
    Agent->>Frontend: 最终响应
    Frontend->>User: 展示完整答案

核心组件架构

1. MindSearchAgent 架构

MindSearchAgent 是整个系统的核心协调器,负责管理搜索图的构建和最终答案的生成。

#### 类结构

class MindSearchAgent(StreamingAgentForInternLM):
    def __init__(
        self,
        searcher_cfg: dict,          # 搜索器配置
        summary_prompt: str,         # 总结提示词
        finish_condition=lambda m: "add_response_node" in m.content,
        max_turn: int = 10,          # 最大轮次
        **kwargs
    ):
        # 配置WebSearchGraph
        WebSearchGraph.SEARCHER_CONFIG = searcher_cfg
        super().__init__(finish_condition=finish_condition, max_turn=max_turn, **kwargs)
        self.summary_prompt = summary_prompt
        self.action = ExecutionAction()  # 代码执行器
    
    def forward(self, message: AgentMessage, session_id=0, **kwargs):
        # 主循环,最多max_turn次
        for _ in range(self.max_turn):
            # 1. 调用LLM生成图构建代码
            # 2. 执行生成的代码
            # 3. 更新图状态
            # 4. 检查是否完成
            # 5. 生成最终回答或继续下一轮

#### 核心工作流程

1. 初始化阶段:接收用户问题,初始化搜索图 2. 图构建阶段:调用LLM生成Python代码,动态构建搜索图 3. 并发搜索阶段:并行执行多个搜索任务 4. 结果收集阶段:收集所有搜索结果,更新图状态 5. 最终生成阶段:基于收集的信息生成完整回答

2. WebSearchGraph 架构

WebSearchGraph 是系统的核心数据结构,实现了动态图构建和并发搜索管理。

#### 数据结构

class WebSearchGraph:
    def __init__(self):
        self.nodes: Dict[str, Dict[str, str]] = {}  # 节点存储
        self.adjacency_list: Dict[str, List[dict]] = defaultdict(list)  # 邻接表
        self.future_to_query = dict()  # 任务映射
        self.searcher_resp_queue = queue.Queue()  # 响应队列
        self.executor = ThreadPoolExecutor(max_workers=10)  # 线程池
        self.n_active_tasks = 0  # 活跃任务数

#### 核心方法

1. add_root_node:添加根节点(用户问题) 2. add_node:添加搜索子问题节点 3. add_response_node:添加响应节点 4. add_edge:添加节点间的边 5. node:获取节点信息

#### 并发处理机制

def add_node(self, node_name: str, node_content: str):
    # 1. 添加节点到图
    self.nodes[node_name] = dict(content=node_content, type="searcher")
    
    # 2. 查找父节点
    parent_nodes = self._find_parent_nodes(node_name)
    
    # 3. 创建搜索任务
    if self.is_async:
        # 异步模式:使用事件循环
        asyncio.run_coroutine_threadsafe(
            _async_search_node_stream(), 
            random.choice(self._SEARCHER_LOOP)
        )
    else:
        # 同步模式:使用线程池
        self.executor.submit(_search_node_stream)
    
    self.n_active_tasks += 1

3. SearcherAgent 架构

SearcherAgent 负责执行具体的搜索任务,包括关键词生成、搜索执行和结果分析。

#### 类结构

class SearcherAgent(StreamingAgentForInternLM):
    def __init__(
        self,
        user_input_template: str = "{question}",
        user_context_template: str = None,
        **kwargs,
    ):
        self.user_input_template = user_input_template
        self.user_context_template = user_context_template
        super().__init__(**kwargs)
    
    def forward(
        self,
        question: str,
        topic: str,
        history: List[dict] = None,
        session_id=0,
        **kwargs,
    ):
        # 1. 构建输入消息
        message = [self.user_input_template.format(question=question, topic=topic)]
        if history and self.user_context_template:
            message = [self.user_context_template.format_map(item) for item in history] + message
        message = "\n".join(message)
        
        # 2. 调用LLM进行搜索和分析
        return super().forward(message, session_id=session_id, **kwargs)

#### 搜索流程

1. 关键词生成:LLM根据问题生成搜索关键词 2. 搜索引擎调用:调用配置的搜索引擎API 3. 结果选择:LLM选择最相关的搜索结果 4. 内容分析:分析选中的网页内容 5. 答案生成:基于分析结果生成回答

4. 流式响应架构

#### SSE 实现机制

# FastAPI 端点
@app.post("/solve")
async def run(request: GenerationParams, _request: Request):
    async def generate():
        # 1. 初始化Agent
        agent = init_agent(...)
        
        # 2. 创建队列和事件
        queue = janus.Queue()
        stop_event = asyncio.Event()
        
        # 3. 包装同步生成器为异步
        def sync_generator_wrapper():
            for response in agent(inputs, session_id=session_id):
                queue.sync_q.put(response)
            queue.sync_q.put(None)
        
        async def async_generator_wrapper():
            loop.run_in_executor(None, sync_generator_wrapper)
            while True:
                response = await queue.async_q.get()
                if response is None:
                    break
                yield response
        
        # 4. 流式发送响应
        async for message in async_generator_wrapper():
            response_json = json.dumps(
                _postprocess_agent_message(message.model_dump()),
                ensure_ascii=False,
            )
            yield {"data": response_json}

#### 状态管理

系统使用 AgentStatusCode 管理响应状态:

  • SESSION_READY:会话准备就绪
  • STREAM_ING:流式响应中
  • PLUGIN_START:插件调用开始
  • CODING:代码生成中
  • END:响应结束

技术实现细节

1. 引用管理系统

#### 引用生成流程

def _generate_references_from_graph(graph: Dict[str, dict]) -> Tuple[str, Dict[int, dict]]:
    ptr, references, references_url = 0, [], {}
    
    for name, data_item in graph.items():
        if name in ["root", "response"]:
            continue
        
        # 提取引用信息
        ref2url = {
            int(k): v for k, v in json.loads(
                data_item["memory"]["agent.memory"][2]["content"]
            ).items()
        }
        
        # 更新引用索引
        updata_ref, ref2url, added_ptr = _update_ref(
            data_item["response"]["content"], ref2url, ptr
        )
        ptr += added_ptr
        
        # 构建引用文本
        references.append(f'## {data_item["content"]}\n\n{updata_ref}')
        references_url.update(ref2url)
    
    return "\n\n".join(references), references_url

#### 引用更新机制

def _update_ref(ref: str, ref2url: Dict[str, str], ptr: int) -> str:
    # 提取所有引用编号
    numbers = list({int(n) for n in re.findall(r"\[\[(\d+)\]\]", ref)})
    numbers = {n: idx + 1 for idx, n in enumerate(numbers)}
    
    # 更新引用索引
    updated_ref = re.sub(
        r"\[\[(\d+)\]\]",
        lambda match: f"[[{numbers[int(match.group(1))] + ptr}]]",
        ref,
    )
    
    # 更新URL映射
    updated_ref2url = {
        numbers[idx] + ptr: ref2url[idx] 
        for idx in numbers if idx in ref2url
    }
    
    return updated_ref, updated_ref2url, len(numbers) + 1

2. 记忆管理机制

#### 会话记忆结构

# Agent记忆结构
memory = {
    session_id: [
        AgentMessage(...),  # 用户消息
        AgentMessage(...),  # 助手消息
        AgentMessage(...),  # 工具调用结果
        ...
    ]
}

# WebSearchGraph节点记忆
node_memory = {
    "content": "问题内容",
    "type": "searcher",
    "response": AgentMessage(...),
    "memory": {
        "agent.memory": [
            AgentMessage(...),  # 系统提示
            AgentMessage(...),  # 用户输入
            AgentMessage(...),  # 搜索结果
            AgentMessage(...),  # 分析结果
        ]
    },
    "session_id": 12345
}

#### 记忆清理策略

# 会话结束时清理记忆
finally:
    await stop_event.wait()
    queue.close()
    await queue.wait_closed()
    agent.agent.memory.memory_map.pop(session_id, None)  # 清理会话记忆

3. 并发控制机制

#### 线程池管理

class WebSearchGraph:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)  # 限制并发数
        self.n_active_tasks = 0  # 追踪活跃任务
    
    def add_node(self, node_name: str, node_content: str):
        # 提交任务到线程池
        self.future_to_query[
            self.executor.submit(_search_node_stream)
        ] = f"{node_name}-{node_content}"
        self.n_active_tasks += 1

#### 异步事件循环管理

class WebSearchGraph:
    _SEARCHER_LOOP = []  # 事件循环列表
    _SEARCHER_THREAD = []  # 线程列表
    
    @classmethod
    def start_loop(cls, n: int = 32):
        """启动多个事件循环"""
        while len(cls._SEARCHER_THREAD) < n:
            def _start_loop():
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                cls._SEARCHER_LOOP.append(loop)
                loop.run_forever()
            
            thread = Thread(target=_start_loop, daemon=True)
            thread.start()
            cls._SEARCHER_THREAD.append(thread)

4. 错误处理机制

#### 异常捕获与恢复

async def generate():
    try:
        async for message in async_generator_wrapper():
            response_json = json.dumps(
                _postprocess_agent_message(message.model_dump()),
                ensure_ascii=False,
            )
            yield {"data": response_json}
    except Exception as exc:
        msg = "An error occurred while generating the response."
        logging.exception(msg)
        response_json = json.dumps(
            dict(error=dict(msg=msg, details=str(exc))), 
            ensure_ascii=False
        )
        yield {"data": response_json}

#### 任务失败处理

def _search_node_stream():
    try:
        for searcher_message in agent(...):
            self.nodes[node_name]["response"] = searcher_message.model_dump()
            self.searcher_resp_queue.put((node_name, self.nodes[node_name], []))
        self.searcher_resp_queue.put((None, None, None))
    except Exception as exc:
        self.searcher_resp_queue.put((exc, None, None))  # 传递异常

性能优化策略

1. 并发优化

#### 动态并发控制

# 根据系统负载动态调整并发数
class DynamicThreadPool:
    def __init__(self, min_workers=5, max_workers=50):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.current_workers = min_workers
        self.executor = ThreadPoolExecutor(max_workers=self.current_workers)
    
    def adjust_pool_size(self, queue_size):
        """根据队列大小调整线程池"""
        if queue_size > 100 and self.current_workers < self.max_workers:
            self.current_workers = min(self.current_workers + 5, self.max_workers)
            self.executor._max_workers = self.current_workers
        elif queue_size < 10 and self.current_workers > self.min_workers:
            self.current_workers = max(self.current_workers - 5, self.min_workers)
            self.executor._max_workers = self.current_workers

#### 异步IO优化

# 使用aiohttp替代requests进行异步HTTP请求
import aiohttp

class AsyncWebBrowser:
    async def search(self, query: str):
        async with aiohttp.ClientSession() as session:
            async with session.get(
                self.search_url, 
                params={"q": query},
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                return await response.json()

2. 内存优化

#### 消息过滤与压缩

def _postprocess_agent_message(message: dict) -> dict:
    """过滤不必要的信息,减少传输数据量"""
    content, fmt = message["content"], message["formatted"]
    current_node = content["current_node"] if isinstance(content, dict) else None
    
    if current_node:
        # 只返回当前节点的信息
        message["content"] = None
        for key in ["ref2url"]:
            fmt.pop(key, None)
        graph = fmt["node"]
        for key in graph.copy():
            if key != current_node:
                graph.pop(key)  # 删除非当前节点
    
    return dict(current_node=current_node, response=message)

#### 记忆清理策略

# 定期清理过期会话
class MemoryManager:
    def __init__(self, max_sessions=1000, ttl=3600):
        self.max_sessions = max_sessions
        self.ttl = ttl
        self.sessions = {}
        self.timestamps = {}
    
    def cleanup(self):
        """清理过期会话"""
        current_time = time.time()
        expired = [
            sid for sid, ts in self.timestamps.items() 
            if current_time - ts > self.ttl
        ]
        
        for sid in expired:
            self.sessions.pop(sid, None)
            self.timestamps.pop(sid, None)

3. 网络优化

#### SSE连接管理

# 前端SSE重连机制
class SSEManager {
    constructor(url, options) {
        this.url = url;
        this.options = options;
        this.controller = new AbortController();
        this.retryCount = 0;
        this.maxRetries = 5;
    }
    
    async connect() {
        try {
            await fetchEventSource(this.url, {
                ...this.options,
                signal: this.controller.signal,
                onerror: (err) => {
                    if (this.retryCount < this.maxRetries) {
                        this.retryCount++;
                        setTimeout(() => this.connect(), 1000 * this.retryCount);
                    }
                }
            });
        } catch (err) {
            console.error('SSE connection failed:', err);
        }
    }
}

#### 数据压缩

# 启用Gzip压缩
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

4. 缓存策略

#### 搜索结果缓存

class SearchCache:
    def __init__(self, max_size=1000, ttl=1800):
        self.cache = {}
        self.timestamps = {}
        self.max_size = max_size
        self.ttl = ttl
    
    def get(self, query: str):
        if query in self.cache:
            if time.time() - self.timestamps[query] < self.ttl:
                return self.cache[query]
            else:
                self.remove(query)
        return None
    
    def put(self, query: str, result):
        if len(self.cache) >= self.max_size:
            # LRU淘汰
            oldest = min(self.timestamps, key=self.timestamps.get)
            self.remove(oldest)
        
        self.cache[query] = result
        self.timestamps[query] = time.time()

扩展性设计

1. 插件化架构

#### 搜索引擎插件

# 搜索引擎基类
class BaseSearchEngine(ABC):
    @abstractmethod
    async def search(self, query: str, topk: int = 10) -> List[Dict]:
        pass
    
    @abstractmethod
    async def select(self, indices: List[int]) -> List[Dict]:
        pass

# 具体实现
class BingSearchEngine(BaseSearchEngine):
    async def search(self, query: str, topk: int = 10):
        # Bing搜索实现
        pass

class GoogleSearchEngine(BaseSearchEngine):
    async def search(self, query: str, topk: int = 10):
        # Google搜索实现
        pass

#### LLM插件

# LLM基类
class BaseLLM(ABC):
    @abstractmethod
    async def stream_chat(self, messages: List[Dict], **kwargs):
        pass
    
    @abstractmethod
    def parse_response(self, response: str) -> Dict:
        pass

# 具体实现
class InternLMLLM(BaseLLM):
    async def stream_chat(self, messages: List[Dict], **kwargs):
        # InternLM实现
        pass

class GPTLLM(BaseLLM):
    async def stream_chat(self, messages: List[Dict], **kwargs):
        # GPT实现
        pass

2. 配置驱动开发

#### 模型配置

# models.py
internlm_server = dict(
    type=LMDeployServer,
    path="internlm/internlm2_5-7b-chat",
    model_name="internlm2_5-7b-chat",
    meta_template=INTERNLM2_META,
    top_p=0.8,
    top_k=1,
    temperature=0,
    max_new_tokens=8192,
    repetition_penalty=1.02,
    stop_words=["<|im_end|>"],
)

gpt4 = dict(
    type=GPTAPI,
    model_type="gpt-4-turbo",
    key=os.environ.get("OPENAI_API_KEY"),
    api_base=os.environ.get("OPENAI_API_BASE"),
)

#### Agent配置

# __init__.py
def init_agent(lang="cn", model_format="internlm_server", search_engine="BingSearch", use_async=False):
    # 动态创建LLM实例
    llm_cfg = deepcopy(getattr(llm_factory, model_format))
    if use_async:
        cls_name = llm_cfg["type"].split(".")[-1]
        llm_cfg["type"] = f"lagent.llms.Async{cls_name}"
    llm = create_object(llm_cfg)
    
    # 动态创建搜索引擎
    plugins = [dict(
        type=AsyncWebBrowser if use_async else WebBrowser,
        searcher_type=search_engine,
        topk=6,
        api_key=os.getenv("WEB_SEARCH_API_KEY"),
    )]
    
    # 创建Agent
    agent = (AsyncMindSearchAgent if use_async else MindSearchAgent)(
        llm=llm,
        searcher_cfg=dict(
            llm=llm,
            plugins=plugins,
            # ...其他配置
        ),
        summary_prompt=FINAL_RESPONSE_CN if lang == "cn" else FINAL_RESPONSE_EN,
        max_turn=10,
    )
    return agent

3. 模块化设计

#### 前端模块化

// 组件结构
src/
├── components/
│   ├── answer/          # 答案展示组件
│   ├── chat-right/      # 右侧聊天面板
│   ├── mind-map/        # 思维导图
│   ├── loading/         # 加载动画
│   └── ...
├── provider/            # 状态管理
├── utils/               # 工具函数
└── pages/
    └── mindsearch/      # 主页面

// 独立组件示例
interface INodeComponentProps {
    nodeInfo: INodeInfo;
    onClick: (node: string) => void;
}

const NodeComponent: React.FC<INodeComponentProps> = ({ nodeInfo, onClick }) => {
    // 组件实现
};

#### 后端模块化

# 清晰的模块划分
mindsearch/
├── __init__.py
├── app.py              # FastAPI服务
├── terminal.py         # 终端界面
└── agent/
    ├── __init__.py     # Agent工厂
    ├── graph.py        # 图搜索实现
    ├── mindsearch_agent.py  # 主Agent
    ├── models.py       # 模型配置
    ├── streaming.py    # 流式处理
    └── mindsearch_prompt.py   # 提示词模板

4. 测试和调试支持

#### 单元测试

# 测试WebSearchGraph
def test_web_search_graph():
    graph = WebSearchGraph()
    graph.add_root_node("测试问题")
    graph.add_node("子问题1", "子问题内容1")
    graph.add_node("子问题2", "子问题内容2")
    graph.add_edge("root", "子问题1")
    graph.add_edge("root", "子问题2")
    
    assert "root" in graph.nodes
    assert "子问题1" in graph.nodes
    assert len(graph.adjacency_list["root"]) == 2

# 测试Agent
def test_mind_search_agent():
    agent = init_agent(lang="cn", model_format="internlm_server")
    response = list(agent("测试问题"))
    assert len(response) > 0
    assert response[-1].stream_state == AgentStatusCode.END

#### 调试工具

# 终端调试
python -m mindsearch.terminal

# 日志配置
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mindsearch.log'),
        logging.StreamHandler()
    ]
)

# 性能分析
from functools import wraps
import time

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        logging.info(f"{func.__name__} took {end - start:.2f} seconds")
        return result
    return wrapper

系统架构图

1. 整体架构图

graph TB
    subgraph 客户端层
        User[用户]
        Browser[浏览器]
        Mobile[移动设备]
        Terminal[终端]
    end
    
    subgraph 接口层
        ReactUI[React前端]
        GradioUI[Gradio界面]
        StreamlitUI[Streamlit界面]
        FastAPI[FastAPI网关]
    end
    
    subgraph 核心服务层
        MindSearchAgent[MindSearchAgent<br/>主协调器]
        AsyncMindSearchAgent[AsyncMindSearchAgent<br/>异步协调器]
        WebSearchGraph[WebSearchGraph<br/>图管理器]
        SearcherAgent[SearcherAgent<br/>搜索执行器]
    end
    
    subgraph 工具层
        WebBrowser[WebBrowser<br/>网页浏览器]
        AsyncWebBrowser[AsyncWebBrowser<br/>异步浏览器]
        ExecutionAction[ExecutionAction<br/>代码执行器]
    end
    
    subgraph 模型层
        InternLM[InternLM2.5-7B]
        GPT4[GPT-4]
        Qwen[Qwen-Max]
        Claude[Claude]
    end
    
    subgraph 搜索层
        Bing[Bing搜索]
        Google[Google搜索]
        DuckDuckGo[DuckDuckGo]
        Brave[Brave搜索]
        Tencent[腾讯搜索]
    end
    
    User --> Browser
    User --> Mobile
    User --> Terminal
    
    Browser --> ReactUI
    Mobile --> ReactUI
    Terminal --> MindSearchAgent
    
    ReactUI --> FastAPI
    GradioUI --> FastAPI
    StreamlitUI --> FastAPI
    
    FastAPI --> MindSearchAgent
    FastAPI --> AsyncMindSearchAgent
    
    MindSearchAgent --> WebSearchGraph
    AsyncMindSearchAgent --> WebSearchGraph
    WebSearchGraph --> SearcherAgent
    
    SearcherAgent --> WebBrowser
    SearcherAgent --> AsyncWebBrowser
    MindSearchAgent --> ExecutionAction
    
    WebBrowser --> Bing
    WebBrowser --> Google
    WebBrowser --> DuckDuckGo
    WebBrowser --> Brave
    WebBrowser --> Tencent
    
    AsyncWebBrowser --> Bing
    AsyncWebBrowser --> Google
    AsyncWebBrowser --> DuckDuckGo
    AsyncWebBrowser --> Brave
    AsyncWebBrowser --> Tencent
    
    MindSearchAgent --> InternLM
    MindSearchAgent --> GPT4
    MindSearchAgent --> Qwen
    MindSearchAgent --> Claude
    
    SearcherAgent --> InternLM
    SearcherAgent --> GPT4
    SearcherAgent --> Qwen
    SearcherAgent --> Claude

2. 数据流图

graph LR
    subgraph 输入阶段
        UserQuery[用户查询]
        QueryAnalysis[查询分析]
        GraphConstruction[图构建]
    end
    
    subgraph 处理阶段
        NodeGeneration[节点生成]
        ConcurrentSearch[并发搜索]
        ResultAnalysis[结果分析]
        GraphUpdate[图更新]
    end
    
    subgraph 输出阶段
        AnswerGeneration[答案生成]
        ReferenceManagement[引用管理]
        StreamResponse[流式响应]
        UserDisplay[用户展示]
    end
    
    UserQuery --> QueryAnalysis
    QueryAnalysis --> GraphConstruction
    
    GraphConstruction --> NodeGeneration
    NodeGeneration --> ConcurrentSearch
    ConcurrentSearch --> ResultAnalysis
    ResultAnalysis --> GraphUpdate
    
    GraphUpdate --> AnswerGeneration
    AnswerGeneration --> ReferenceManagement
    ReferenceManagement --> StreamResponse
    StreamResponse --> UserDisplay
    
    style UserQuery fill:#e1f5ff
    style UserDisplay fill:#e8f5e9

3. 并发处理架构

graph TB
    subgraph 主线程
        MainThread[主线程<br/>MindSearchAgent]
        GraphBuilder[图构建器]
        ResponseCollector[响应收集器]
    end
    
    subgraph 工作线程池
        Thread1[工作线程1<br/>SearcherAgent]
        Thread2[工作线程2<br/>SearcherAgent]
        Thread3[工作线程3<br/>SearcherAgent]
        ThreadN[工作线程N<br/>SearcherAgent]
    end
    
    subgraph 异步事件循环
        EventLoop1[事件循环1]
        EventLoop2[事件循环2]
        EventLoop3[事件循环3]
        EventLoopM[事件循环M]
    end
    
    subgraph 外部服务
        LLM1[LLM服务]
        LLM2[LLM服务]
        Search1[搜索引擎]
        Search2[搜索引擎]
    end
    
    MainThread --> GraphBuilder
    GraphBuilder --> ResponseCollector
    
    MainThread --> Thread1
    MainThread --> Thread2
    MainThread --> Thread3
    MainThread --> ThreadN
    
    Thread1 --> EventLoop1
    Thread2 --> EventLoop2
    Thread3 --> EventLoop3
    ThreadN --> EventLoopM
    
    EventLoop1 --> LLM1
    EventLoop1 --> Search1
    EventLoop2 --> LLM1
    EventLoop2 --> Search2
    EventLoop3 --> LLM2
    EventLoop3 --> Search1
    EventLoopM --> LLM2
    EventLoopM --> Search2
    
    Thread1 --> ResponseCollector
    Thread2 --> ResponseCollector
    Thread3 --> ResponseCollector
    ThreadN --> ResponseCollector
    
    ResponseCollector --> MainThread

4. 状态转换图

stateDiagram-v2
    [*] --> SESSION_READY: 初始化
    
    SESSION_READY --> STREAM_ING: 开始生成
    STREAM_ING --> PLUGIN_START: 调用插件
    STREAM_ING --> CODING: 生成代码
    STREAM_ING --> END: 生成完成
    
    PLUGIN_START --> STREAM_ING: 插件返回
    CODING --> STREAM_ING: 代码执行完成
    
    END --> [*]: 会话结束
    
    note right of SESSION_READY: "会话准备就绪"
    note right of STREAM_ING: "流式响应中"
    note right of PLUGIN_START: "插件调用开始"
    note right of CODING: "代码生成中"
    note right of END: "响应结束"

总结

MindSearch 的架构设计体现了以下核心优势:

1. 模块化设计

  • 清晰的层次划分,各模块职责明确
  • 插件化架构,易于扩展新的模型和搜索引擎
  • 配置驱动,灵活适应不同场景

2. 高性能并发

  • 线程池和事件循环的混合使用
  • 动态并发控制,资源利用率高
  • 异步IO优化,减少等待时间

3. 智能任务分解

  • 基于图的动态任务分解
  • 并行搜索,大幅缩短响应时间
  • 智能结果合并和引用管理

4. 优秀的用户体验

  • SSE流式响应,实时展示思考过程
  • 透明的解决方案路径
  • 丰富的交互界面选择

5. 强大的可扩展性

  • 支持多种LLM和搜索引擎
  • 灵活的配置系统
  • 完善的测试和调试支持
这种架构设计使 MindSearch 能够在保证回答质量的同时,提供快速的响应速度和优秀的用户体验,为AI搜索引擎的发展提供了优秀的参考实现。