Loading...
正在加载...
请稍候

MindSearch: 模拟人类思维的人工智能搜索框架 思·索 — 通过多智能体框架实现深度网络信息搜索与整合

QianXun (QianXun) 2025年11月23日 02:16

讨论回复

1 条回复
QianXun (QianXun) #1
2025-11-23 02:34

目录

  1. 项目概述
  2. 系统整体架构
  3. 核心组件架构
  4. 技术实现细节
  5. 性能优化策略
  6. 扩展性设计
  7. 系统架构图

项目概述

背景与目标

MindSearch 是一个开源的 AI 搜索引擎框架,旨在模仿人类思维过程进行深度信息检索。项目的核心目标是构建一个能够理解复杂问题、自动分解任务、并行搜索信息并生成高质量回答的智能系统。

核心特性

  • 深度知识探索:通过搜索数百个网页,提供广泛且深层次的答案
  • 透明的解决方案路径:提供完整的思考路径、搜索关键词等信息
  • 动态图构建过程:将用户查询分解为图中的子问题节点,逐步扩展
  • 多种用户界面:支持 React、Gradio、Streamlit 等多种前端界面
  • 多模型支持:兼容 InternLM、GPT-4、Qwen 等多种大语言模型
  • 多搜索引擎集成:支持 DuckDuckGo、Bing、Google、Brave 等搜索引擎

系统整体架构

分层架构设计

MindSearch 采用清晰的分层架构,各层职责明确:

核心工作流程

核心组件架构

1. MindSearchAgent 架构

MindSearchAgent 是整个系统的核心协调器,负责管理搜索图的构建和最终答案的生成。

类结构

class MindSearchAgent(StreamingAgentForInternLM):
    def __init__(
        self,
        searcher_cfg: dict,          # 搜索器配置
        summary_prompt: str,         # 总结提示词
        finish_condition=lambda m: "add_response_node" in m.content,
        max_turn: int = 10,          # 最大轮次
        **kwargs
    ):
        # 配置WebSearchGraph
        WebSearchGraph.SEARCHER_CONFIG = searcher_cfg
        super().__init__(finish_condition=finish_condition, max_turn=max_turn, **kwargs)
        self.summary_prompt = summary_prompt
        self.action = ExecutionAction()  # 代码执行器
    
    def forward(self, message: AgentMessage, session_id=0, **kwargs):
        # 主循环,最多max_turn次
        for _ in range(self.max_turn):
            # 1. 调用LLM生成图构建代码
            # 2. 执行生成的代码
            # 3. 更新图状态
            # 4. 检查是否完成
            # 5. 生成最终回答或继续下一轮

核心工作流程

  1. 初始化阶段:接收用户问题,初始化搜索图
  2. 图构建阶段:调用LLM生成Python代码,动态构建搜索图
  3. 并发搜索阶段:并行执行多个搜索任务
  4. 结果收集阶段:收集所有搜索结果,更新图状态
  5. 最终生成阶段:基于收集的信息生成完整回答

2. WebSearchGraph 架构

WebSearchGraph 是系统的核心数据结构,实现了动态图构建和并发搜索管理。

数据结构

class WebSearchGraph:
    def __init__(self):
        self.nodes: Dict[str, Dict[str, str]] = {}  # 节点存储
        self.adjacency_list: Dict[str, List[dict]] = defaultdict(list)  # 邻接表
        self.future_to_query = dict()  # 任务映射
        self.searcher_resp_queue = queue.Queue()  # 响应队列
        self.executor = ThreadPoolExecutor(max_workers=10)  # 线程池
        self.n_active_tasks = 0  # 活跃任务数

核心方法

  1. add_root_node:添加根节点(用户问题)
  2. add_node:添加搜索子问题节点
  3. add_response_node:添加响应节点
  4. add_edge:添加节点间的边
  5. node:获取节点信息

并发处理机制

def add_node(self, node_name: str, node_content: str):
    # 1. 添加节点到图
    self.nodes[node_name] = dict(content=node_content, type="searcher")
    
    # 2. 查找父节点
    parent_nodes = self._find_parent_nodes(node_name)
    
    # 3. 创建搜索任务
    if self.is_async:
        # 异步模式:使用事件循环
        asyncio.run_coroutine_threadsafe(
            _async_search_node_stream(), 
            random.choice(self._SEARCHER_LOOP)
        )
    else:
        # 同步模式:使用线程池
        self.executor.submit(_search_node_stream)
    
    self.n_active_tasks += 1

3. SearcherAgent 架构

SearcherAgent 负责执行具体的搜索任务,包括关键词生成、搜索执行和结果分析。

类结构

class SearcherAgent(StreamingAgentForInternLM):
    def __init__(
        self,
        user_input_template: str = "{question}",
        user_context_template: str = None,
        **kwargs,
    ):
        self.user_input_template = user_input_template
        self.user_context_template = user_context_template
        super().__init__(**kwargs)
    
    def forward(
        self,
        question: str,
        topic: str,
        history: List[dict] = None,
        session_id=0,
        **kwargs,
    ):
        # 1. 构建输入消息
        message = [self.user_input_template.format(question=question, topic=topic)]
        if history and self.user_context_template:
            message = [self.user_context_template.format_map(item) for item in history] + message
        message = "\n".join(message)
        
        # 2. 调用LLM进行搜索和分析
        return super().forward(message, session_id=session_id, **kwargs)

搜索流程

  1. 关键词生成:LLM根据问题生成搜索关键词
  2. 搜索引擎调用:调用配置的搜索引擎API
  3. 结果选择:LLM选择最相关的搜索结果
  4. 内容分析:分析选中的网页内容
  5. 答案生成:基于分析结果生成回答

4. 流式响应架构

SSE 实现机制

# FastAPI 端点
@app.post("/solve")
async def run(request: GenerationParams, _request: Request):
    async def generate():
        # 1. 初始化Agent
        agent = init_agent(...)
        
        # 2. 创建队列和事件
        queue = janus.Queue()
        stop_event = asyncio.Event()
        
        # 3. 包装同步生成器为异步
        def sync_generator_wrapper():
            for response in agent(inputs, session_id=session_id):
                queue.sync_q.put(response)
            queue.sync_q.put(None)
        
        async def async_generator_wrapper():
            loop.run_in_executor(None, sync_generator_wrapper)
            while True:
                response = await queue.async_q.get()
                if response is None:
                    break
                yield response
        
        # 4. 流式发送响应
        async for message in async_generator_wrapper():
            response_json = json.dumps(
                _postprocess_agent_message(message.model_dump()),
                ensure_ascii=False,
            )
            yield {"data": response_json}

状态管理

系统使用 AgentStatusCode 管理响应状态:

  • SESSION_READY:会话准备就绪
  • STREAM_ING:流式响应中
  • PLUGIN_START:插件调用开始
  • CODING:代码生成中
  • END:响应结束

技术实现细节

1. 引用管理系统

引用生成流程

def _generate_references_from_graph(graph: Dict[str, dict]) -> Tuple[str, Dict[int, dict]]:
    ptr, references, references_url = 0, [], {}
    
    for name, data_item in graph.items():
        if name in ["root", "response"]:
            continue
        
        # 提取引用信息
        ref2url = {
            int(k): v for k, v in json.loads(
                data_item["memory"]["agent.memory"][2]["content"]
            ).items()
        }
        
        # 更新引用索引
        updata_ref, ref2url, added_ptr = _update_ref(
            data_item["response"]["content"], ref2url, ptr
        )
        ptr += added_ptr
        
        # 构建引用文本
        references.append(f'## {data_item["content"]}\n\n{updata_ref}')
        references_url.update(ref2url)
    
    return "\n\n".join(references), references_url

引用更新机制

def _update_ref(ref: str, ref2url: Dict[str, str], ptr: int) -> str:
    # 提取所有引用编号
    numbers = list({int(n) for n in re.findall(r"
\[\[(\d+)\]
\]"
, ref)}) numbers = {n: idx + 1 for idx, n in enumerate(numbers)} # 更新引用索引 updated_ref = re.sub( r"
\[\[(\d+)\]
\]"
, lambda match: f"[[{numbers[int(match.group(1))] + ptr}]]", ref, ) # 更新URL映射 updated_ref2url = { numbers[idx] + ptr: ref2url[idx] for idx in numbers if idx in ref2url } return updated_ref, updated_ref2url, len(numbers) + 1

2. 记忆管理机制

会话记忆结构

# Agent记忆结构
memory = {
    session_id: [
        AgentMessage(...),  # 用户消息
        AgentMessage(...),  # 助手消息
        AgentMessage(...),  # 工具调用结果
        ...
    ]
}

# WebSearchGraph节点记忆
node_memory = {
    "content": "问题内容",
    "type": "searcher",
    "response": AgentMessage(...),
    "memory": {
        "agent.memory": [
            AgentMessage(...),  # 系统提示
            AgentMessage(...),  # 用户输入
            AgentMessage(...),  # 搜索结果
            AgentMessage(...),  # 分析结果
        ]
    },
    "session_id": 12345
}

记忆清理策略

# 会话结束时清理记忆
finally:
    await stop_event.wait()
    queue.close()
    await queue.wait_closed()
    agent.agent.memory.memory_map.pop(session_id, None)  # 清理会话记忆

3. 并发控制机制

线程池管理

class WebSearchGraph:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)  # 限制并发数
        self.n_active_tasks = 0  # 追踪活跃任务
    
    def add_node(self, node_name: str, node_content: str):
        # 提交任务到线程池
        self.future_to_query[
            self.executor.submit(_search_node_stream)
        ] = f"{node_name}-{node_content}"
        self.n_active_tasks += 1

异步事件循环管理

class WebSearchGraph:
    _SEARCHER_LOOP = []  # 事件循环列表
    _SEARCHER_THREAD = []  # 线程列表
    
    @classmethod
    def start_loop(cls, n: int = 32):
        """启动多个事件循环"""
        while len(cls._SEARCHER_THREAD) < n:
            def _start_loop():
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                cls._SEARCHER_LOOP.append(loop)
                loop.run_forever()
            
            thread = Thread(target=_start_loop, daemon=True)
            thread.start()
            cls._SEARCHER_THREAD.append(thread)

4. 错误处理机制

异常捕获与恢复

async def generate():
    try:
        async for message in async_generator_wrapper():
            response_json = json.dumps(
                _postprocess_agent_message(message.model_dump()),
                ensure_ascii=False,
            )
            yield {"data": response_json}
    except Exception as exc:
        msg = "An error occurred while generating the response."
        logging.exception(msg)
        response_json = json.dumps(
            dict(error=dict(msg=msg, details=str(exc))), 
            ensure_ascii=False
        )
        yield {"data": response_json}

任务失败处理

def _search_node_stream():
    try:
        for searcher_message in agent(...):
            self.nodes[node_name]["response"] = searcher_message.model_dump()
            self.searcher_resp_queue.put((node_name, self.nodes[node_name], []))
        self.searcher_resp_queue.put((None, None, None))
    except Exception as exc:
        self.searcher_resp_queue.put((exc, None, None))  # 传递异常

性能优化策略

1. 并发优化

动态并发控制

# 根据系统负载动态调整并发数
class DynamicThreadPool:
    def __init__(self, min_workers=5, max_workers=50):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.current_workers = min_workers
        self.executor = ThreadPoolExecutor(max_workers=self.current_workers)
    
    def adjust_pool_size(self, queue_size):
        """根据队列大小调整线程池"""
        if queue_size > 100 and self.current_workers < self.max_workers:
            self.current_workers = min(self.current_workers + 5, self.max_workers)
            self.executor._max_workers = self.current_workers
        elif queue_size < 10 and self.current_workers > self.min_workers:
            self.current_workers = max(self.current_workers - 5, self.min_workers)
            self.executor._max_workers = self.current_workers

异步IO优化

# 使用aiohttp替代requests进行异步HTTP请求
import aiohttp

class AsyncWebBrowser:
    async def search(self, query: str):
        async with aiohttp.ClientSession() as session:
            async with session.get(
                self.search_url, 
                params={"q": query},
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                return await response.json()

2. 内存优化

消息过滤与压缩

def _postprocess_agent_message(message: dict) -> dict:
    """过滤不必要的信息,减少传输数据量"""
    content, fmt = message["content"], message["formatted"]
    current_node = content["current_node"] if isinstance(content, dict) else None
    
    if current_node:
        # 只返回当前节点的信息
        message["content"] = None
        for key in ["ref2url"]:
            fmt.pop(key, None)
        graph = fmt["node"]
        for key in graph.copy():
            if key != current_node:
                graph.pop(key)  # 删除非当前节点
    
    return dict(current_node=current_node, response=message)

记忆清理策略

# 定期清理过期会话
class MemoryManager:
    def __init__(self, max_sessions=1000, ttl=3600):
        self.max_sessions = max_sessions
        self.ttl = ttl
        self.sessions = {}
        self.timestamps = {}
    
    def cleanup(self):
        """清理过期会话"""
        current_time = time.time()
        expired = [
            sid for sid, ts in self.timestamps.items() 
            if current_time - ts > self.ttl
        ]
        
        for sid in expired:
            self.sessions.pop(sid, None)
            self.timestamps.pop(sid, None)

3. 网络优化

SSE连接管理

# 前端SSE重连机制
class SSEManager {
    constructor(url, options) {
        this.url = url;
        this.options = options;
        this.controller = new AbortController();
        this.retryCount = 0;
        this.maxRetries = 5;
    }
    
    async connect() {
        try {
            await fetchEventSource(this.url, {
                ...this.options,
                signal: this.controller.signal,
                onerror: (err) => {
                    if (this.retryCount < this.maxRetries) {
                        this.retryCount++;
                        setTimeout(() => this.connect(), 1000 * this.retryCount);
                    }
                }
            });
        } catch (err) {
            console.error('SSE connection failed:', err);
        }
    }
}

数据压缩

# 启用Gzip压缩
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

4. 缓存策略

搜索结果缓存

class SearchCache:
    def __init__(self, max_size=1000, ttl=1800):
        self.cache = {}
        self.timestamps = {}
        self.max_size = max_size
        self.ttl = ttl
    
    def get(self, query: str):
        if query in self.cache:
            if time.time() - self.timestamps[query] < self.ttl:
                return self.cache[query]
            else:
                self.remove(query)
        return None
    
    def put(self, query: str, result):
        if len(self.cache) >= self.max_size:
            # LRU淘汰
            oldest = min(self.timestamps, key=self.timestamps.get)
            self.remove(oldest)
        
        self.cache[query] = result
        self.timestamps[query] = time.time()

扩展性设计

1. 插件化架构

搜索引擎插件

# 搜索引擎基类
class BaseSearchEngine(ABC):
    @abstractmethod
    async def search(self, query: str, topk: int = 10) -> List[Dict]:
        pass
    
    @abstractmethod
    async def select(self, indices: List[int]) -> List[Dict]:
        pass

# 具体实现
class BingSearchEngine(BaseSearchEngine):
    async def search(self, query: str, topk: int = 10):
        # Bing搜索实现
        pass

class GoogleSearchEngine(BaseSearchEngine):
    async def search(self, query: str, topk: int = 10):
        # Google搜索实现
        pass

LLM插件

# LLM基类
class BaseLLM(ABC):
    @abstractmethod
    async def stream_chat(self, messages: List[Dict], **kwargs):
        pass
    
    @abstractmethod
    def parse_response(self, response: str) -> Dict:
        pass

# 具体实现
class InternLMLLM(BaseLLM):
    async def stream_chat(self, messages: List[Dict], **kwargs):
        # InternLM实现
        pass

class GPTLLM(BaseLLM):
    async def stream_chat(self, messages: List[Dict], **kwargs):
        # GPT实现
        pass

2. 配置驱动开发

模型配置

# models.py
internlm_server = dict(
    type=LMDeployServer,
    path="internlm/internlm2_5-7b-chat",
    model_name="internlm2_5-7b-chat",
    meta_template=INTERNLM2_META,
    top_p=0.8,
    top_k=1,
    temperature=0,
    max_new_tokens=8192,
    repetition_penalty=1.02,
    stop_words=["<|im_end|>"],
)

gpt4 = dict(
    type=GPTAPI,
    model_type="gpt-4-turbo",
    key=os.environ.get("OPENAI_API_KEY"),
    api_base=os.environ.get("OPENAI_API_BASE"),
)

Agent配置

# __init__.py
def init_agent(lang="cn", model_format="internlm_server", search_engine="BingSearch", use_async=False):
    # 动态创建LLM实例
    llm_cfg = deepcopy(getattr(llm_factory, model_format))
    if use_async:
        cls_name = llm_cfg["type"].split(".")[-1]
        llm_cfg["type"] = f"lagent.llms.Async{cls_name}"
    llm = create_object(llm_cfg)
    
    # 动态创建搜索引擎
    plugins = [dict(
        type=AsyncWebBrowser if use_async else WebBrowser,
        searcher_type=search_engine,
        topk=6,
        api_key=os.getenv("WEB_SEARCH_API_KEY"),
    )]
    
    # 创建Agent
    agent = (AsyncMindSearchAgent if use_async else MindSearchAgent)(
        llm=llm,
        searcher_cfg=dict(
            llm=llm,
            plugins=plugins,
            # ...其他配置
        ),
        summary_prompt=FINAL_RESPONSE_CN if lang == "cn" else FINAL_RESPONSE_EN,
        max_turn=10,
    )
    return agent

3. 模块化设计

前端模块化

// 组件结构
src/
├── components/
│   ├── answer/          # 答案展示组件
│   ├── chat-right/      # 右侧聊天面板
│   ├── mind-map/        # 思维导图
│   ├── loading/         # 加载动画
│   └── ...
├── provider/            # 状态管理
├── utils/               # 工具函数
└── pages/
    └── mindsearch/      # 主页面

// 独立组件示例
interface INodeComponentProps {
    nodeInfo: INodeInfo;
    onClick: (node: string) => void;
}

const NodeComponent: React.FC<INodeComponentProps> = ({ nodeInfo, onClick }) => {
    // 组件实现
};

后端模块化

# 清晰的模块划分
mindsearch/
├── __init__.py
├── app.py              # FastAPI服务
├── terminal.py         # 终端界面
└── agent/
    ├── __init__.py     # Agent工厂
    ├── graph.py        # 图搜索实现
    ├── mindsearch_agent.py  # 主Agent
    ├── models.py       # 模型配置
    ├── streaming.py    # 流式处理
    └── mindsearch_prompt.py   # 提示词模板

4. 测试和调试支持

单元测试

# 测试WebSearchGraph
def test_web_search_graph():
    graph = WebSearchGraph()
    graph.add_root_node("测试问题")
    graph.add_node("子问题1", "子问题内容1")
    graph.add_node("子问题2", "子问题内容2")
    graph.add_edge("root", "子问题1")
    graph.add_edge("root", "子问题2")
    
    assert "root" in graph.nodes
    assert "子问题1" in graph.nodes
    assert len(graph.adjacency_list["root"]) == 2

# 测试Agent
def test_mind_search_agent():
    agent = init_agent(lang="cn", model_format="internlm_server")
    response = list(agent("测试问题"))
    assert len(response) > 0
    assert response[-1].stream_state == AgentStatusCode.END

调试工具

# 终端调试
python -m mindsearch.terminal

# 日志配置
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('mindsearch.log'),
        logging.StreamHandler()
    ]
)

# 性能分析
from functools import wraps
import time

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        logging.info(f"{func.__name__} took {end - start:.2f} seconds")
        return result
    return wrapper

系统架构图

1. 整体架构图

2. 数据流图

3. 并发处理架构

4. 状态转换图

总结

MindSearch 的架构设计体现了以下核心优势:

1. 模块化设计

  • 清晰的层次划分,各模块职责明确
  • 插件化架构,易于扩展新的模型和搜索引擎
  • 配置驱动,灵活适应不同场景

2. 高性能并发

  • 线程池和事件循环的混合使用
  • 动态并发控制,资源利用率高
  • 异步IO优化,减少等待时间

3. 智能任务分解

  • 基于图的动态任务分解
  • 并行搜索,大幅缩短响应时间
  • 智能结果合并和引用管理

4. 优秀的用户体验

  • SSE流式响应,实时展示思考过程
  • 透明的解决方案路径
  • 丰富的交互界面选择

5. 强大的可扩展性

  • 支持多种LLM和搜索引擎
  • 灵活的配置系统
  • 完善的测试和调试支持

这种架构设计使 MindSearch 能够在保证回答质量的同时,提供快速的响应速度和优秀的用户体验,为AI搜索引擎的发展提供了优秀的参考实现。

推荐
智谱 GLM-5 已上线

我正在智谱大模型开放平台 BigModel.cn 上打造 AI 应用,智谱新一代旗舰模型 GLM-5 已上线,在推理、代码、智能体综合能力达到开源模型 SOTA 水平。

领取 2000万 Tokens 通过邀请链接注册即可获得大礼包,期待和你一起在 BigModel 上畅享卓越模型能力
登录