目录
- 项目概述
- 系统整体架构
- 核心组件架构
- 技术实现细节
- 性能优化策略
- 扩展性设计
- 系统架构图
项目概述
背景与目标
MindSearch 是一个开源的 AI 搜索引擎框架,旨在模仿人类思维过程进行深度信息检索。项目的核心目标是构建一个能够理解复杂问题、自动分解任务、并行搜索信息并生成高质量回答的智能系统。
核心特性
- 深度知识探索:通过搜索数百个网页,提供广泛且深层次的答案
- 透明的解决方案路径:提供完整的思考路径、搜索关键词等信息
- 动态图构建过程:将用户查询分解为图中的子问题节点,逐步扩展
- 多种用户界面:支持 React、Gradio、Streamlit 等多种前端界面
- 多模型支持:兼容 InternLM、GPT-4、Qwen 等多种大语言模型
- 多搜索引擎集成:支持 DuckDuckGo、Bing、Google、Brave 等搜索引擎
系统整体架构
分层架构设计
MindSearch 采用清晰的分层架构,各层职责明确:
核心工作流程
核心组件架构
1. MindSearchAgent 架构
MindSearchAgent 是整个系统的核心协调器,负责管理搜索图的构建和最终答案的生成。
类结构
class MindSearchAgent(StreamingAgentForInternLM):
def __init__(
self,
searcher_cfg: dict, # 搜索器配置
summary_prompt: str, # 总结提示词
finish_condition=lambda m: "add_response_node" in m.content,
max_turn: int = 10, # 最大轮次
**kwargs
):
# 配置WebSearchGraph
WebSearchGraph.SEARCHER_CONFIG = searcher_cfg
super().__init__(finish_condition=finish_condition, max_turn=max_turn, **kwargs)
self.summary_prompt = summary_prompt
self.action = ExecutionAction() # 代码执行器
def forward(self, message: AgentMessage, session_id=0, **kwargs):
# 主循环,最多max_turn次
for _ in range(self.max_turn):
# 1. 调用LLM生成图构建代码
# 2. 执行生成的代码
# 3. 更新图状态
# 4. 检查是否完成
# 5. 生成最终回答或继续下一轮
核心工作流程
- 初始化阶段:接收用户问题,初始化搜索图
- 图构建阶段:调用LLM生成Python代码,动态构建搜索图
- 并发搜索阶段:并行执行多个搜索任务
- 结果收集阶段:收集所有搜索结果,更新图状态
- 最终生成阶段:基于收集的信息生成完整回答
2. WebSearchGraph 架构
WebSearchGraph 是系统的核心数据结构,实现了动态图构建和并发搜索管理。
数据结构
class WebSearchGraph:
def __init__(self):
self.nodes: Dict[str, Dict[str, str]] = {} # 节点存储
self.adjacency_list: Dict[str, List[dict]] = defaultdict(list) # 邻接表
self.future_to_query = dict() # 任务映射
self.searcher_resp_queue = queue.Queue() # 响应队列
self.executor = ThreadPoolExecutor(max_workers=10) # 线程池
self.n_active_tasks = 0 # 活跃任务数
核心方法
- addrootnode:添加根节点(用户问题)
- addnode:添加搜索子问题节点
- addresponsenode:添加响应节点
- addedge:添加节点间的边
- node:获取节点信息
并发处理机制
def add_node(self, node_name: str, node_content: str):
# 1. 添加节点到图
self.nodes[node_name] = dict(content=node_content, type="searcher")
# 2. 查找父节点
parent_nodes = self._find_parent_nodes(node_name)
# 3. 创建搜索任务
if self.is_async:
# 异步模式:使用事件循环
asyncio.run_coroutine_threadsafe(
_async_search_node_stream(),
random.choice(self._SEARCHER_LOOP)
)
else:
# 同步模式:使用线程池
self.executor.submit(_search_node_stream)
self.n_active_tasks += 1
3. SearcherAgent 架构
SearcherAgent 负责执行具体的搜索任务,包括关键词生成、搜索执行和结果分析。
类结构
class SearcherAgent(StreamingAgentForInternLM):
def __init__(
self,
user_input_template: str = "{question}",
user_context_template: str = None,
**kwargs,
):
self.user_input_template = user_input_template
self.user_context_template = user_context_template
super().__init__(**kwargs)
def forward(
self,
question: str,
topic: str,
history: List[dict] = None,
session_id=0,
**kwargs,
):
# 1. 构建输入消息
message = [self.user_input_template.format(question=question, topic=topic)]
if history and self.user_context_template:
message = [self.user_context_template.format_map(item) for item in history] + message
message = "\n".join(message)
# 2. 调用LLM进行搜索和分析
return super().forward(message, session_id=session_id, **kwargs)
搜索流程
- 关键词生成:LLM根据问题生成搜索关键词
- 搜索引擎调用:调用配置的搜索引擎API
- 结果选择:LLM选择最相关的搜索结果
- 内容分析:分析选中的网页内容
- 答案生成:基于分析结果生成回答
4. 流式响应架构
SSE 实现机制
# FastAPI 端点
@app.post("/solve")
async def run(request: GenerationParams, _request: Request):
async def generate():
# 1. 初始化Agent
agent = init_agent(...)
# 2. 创建队列和事件
queue = janus.Queue()
stop_event = asyncio.Event()
# 3. 包装同步生成器为异步
def sync_generator_wrapper():
for response in agent(inputs, session_id=session_id):
queue.sync_q.put(response)
queue.sync_q.put(None)
async def async_generator_wrapper():
loop.run_in_executor(None, sync_generator_wrapper)
while True:
response = await queue.async_q.get()
if response is None:
break
yield response
# 4. 流式发送响应
async for message in async_generator_wrapper():
response_json = json.dumps(
_postprocess_agent_message(message.model_dump()),
ensure_ascii=False,
)
yield {"data": response_json}
状态管理
系统使用 AgentStatusCode 管理响应状态:
SESSION_READY:会话准备就绪STREAM_ING:流式响应中PLUGIN_START:插件调用开始CODING:代码生成中END:响应结束
技术实现细节
1. 引用管理系统
引用生成流程
def _generate_references_from_graph(graph: Dict[str, dict]) -> Tuple[str, Dict[int, dict]]:
ptr, references, references_url = 0, [], {}
for name, data_item in graph.items():
if name in ["root", "response"]:
continue
# 提取引用信息
ref2url = {
int(k): v for k, v in json.loads(
data_item["memory"]["agent.memory"][2]["content"]
).items()
}
# 更新引用索引
updata_ref, ref2url, added_ptr = _update_ref(
data_item["response"]["content"], ref2url, ptr
)
ptr += added_ptr
# 构建引用文本
references.append(f'## {data_item["content"]}\n\n{updata_ref}')
references_url.update(ref2url)
return "\n\n".join(references), references_url
引用更新机制
def _update_ref(ref: str, ref2url: Dict[str, str], ptr: int) -> str:
# 提取所有引用编号
numbers = list({int(n) for n in re.findall(r"\[\[(\d+)\]\]", ref)})
numbers = {n: idx + 1 for idx, n in enumerate(numbers)}
# 更新引用索引
updated_ref = re.sub(
r"\[\[(\d+)\]\]",
lambda match: f"[[{numbers[int(match.group(1))] + ptr}]]",
ref,
)
# 更新URL映射
updated_ref2url = {
numbers[idx] + ptr: ref2url[idx]
for idx in numbers if idx in ref2url
}
return updated_ref, updated_ref2url, len(numbers) + 1
2. 记忆管理机制
会话记忆结构
# Agent记忆结构
memory = {
session_id: [
AgentMessage(...), # 用户消息
AgentMessage(...), # 助手消息
AgentMessage(...), # 工具调用结果
...
]
}
# WebSearchGraph节点记忆
node_memory = {
"content": "问题内容",
"type": "searcher",
"response": AgentMessage(...),
"memory": {
"agent.memory": [
AgentMessage(...), # 系统提示
AgentMessage(...), # 用户输入
AgentMessage(...), # 搜索结果
AgentMessage(...), # 分析结果
]
},
"session_id": 12345
}
记忆清理策略
# 会话结束时清理记忆
finally:
await stop_event.wait()
queue.close()
await queue.wait_closed()
agent.agent.memory.memory_map.pop(session_id, None) # 清理会话记忆
3. 并发控制机制
线程池管理
class WebSearchGraph:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10) # 限制并发数
self.n_active_tasks = 0 # 追踪活跃任务
def add_node(self, node_name: str, node_content: str):
# 提交任务到线程池
self.future_to_query[
self.executor.submit(_search_node_stream)
] = f"{node_name}-{node_content}"
self.n_active_tasks += 1
异步事件循环管理
class WebSearchGraph:
_SEARCHER_LOOP = [] # 事件循环列表
_SEARCHER_THREAD = [] # 线程列表
@classmethod
def start_loop(cls, n: int = 32):
"""启动多个事件循环"""
while len(cls._SEARCHER_THREAD) < n:
def _start_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
cls._SEARCHER_LOOP.append(loop)
loop.run_forever()
thread = Thread(target=_start_loop, daemon=True)
thread.start()
cls._SEARCHER_THREAD.append(thread)
4. 错误处理机制
异常捕获与恢复
async def generate():
try:
async for message in async_generator_wrapper():
response_json = json.dumps(
_postprocess_agent_message(message.model_dump()),
ensure_ascii=False,
)
yield {"data": response_json}
except Exception as exc:
msg = "An error occurred while generating the response."
logging.exception(msg)
response_json = json.dumps(
dict(error=dict(msg=msg, details=str(exc))),
ensure_ascii=False
)
yield {"data": response_json}
任务失败处理
def _search_node_stream():
try:
for searcher_message in agent(...):
self.nodes[node_name]["response"] = searcher_message.model_dump()
self.searcher_resp_queue.put((node_name, self.nodes[node_name], []))
self.searcher_resp_queue.put((None, None, None))
except Exception as exc:
self.searcher_resp_queue.put((exc, None, None)) # 传递异常
性能优化策略
1. 并发优化
动态并发控制
# 根据系统负载动态调整并发数
class DynamicThreadPool:
def __init__(self, min_workers=5, max_workers=50):
self.min_workers = min_workers
self.max_workers = max_workers
self.current_workers = min_workers
self.executor = ThreadPoolExecutor(max_workers=self.current_workers)
def adjust_pool_size(self, queue_size):
"""根据队列大小调整线程池"""
if queue_size > 100 and self.current_workers < self.max_workers:
self.current_workers = min(self.current_workers + 5, self.max_workers)
self.executor._max_workers = self.current_workers
elif queue_size < 10 and self.current_workers > self.min_workers:
self.current_workers = max(self.current_workers - 5, self.min_workers)
self.executor._max_workers = self.current_workers
异步IO优化
# 使用aiohttp替代requests进行异步HTTP请求
import aiohttp
class AsyncWebBrowser:
async def search(self, query: str):
async with aiohttp.ClientSession() as session:
async with session.get(
self.search_url,
params={"q": query},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
return await response.json()
2. 内存优化
消息过滤与压缩
def _postprocess_agent_message(message: dict) -> dict:
"""过滤不必要的信息,减少传输数据量"""
content, fmt = message["content"], message["formatted"]
current_node = content["current_node"] if isinstance(content, dict) else None
if current_node:
# 只返回当前节点的信息
message["content"] = None
for key in ["ref2url"]:
fmt.pop(key, None)
graph = fmt["node"]
for key in graph.copy():
if key != current_node:
graph.pop(key) # 删除非当前节点
return dict(current_node=current_node, response=message)
记忆清理策略
# 定期清理过期会话
class MemoryManager:
def __init__(self, max_sessions=1000, ttl=3600):
self.max_sessions = max_sessions
self.ttl = ttl
self.sessions = {}
self.timestamps = {}
def cleanup(self):
"""清理过期会话"""
current_time = time.time()
expired = [
sid for sid, ts in self.timestamps.items()
if current_time - ts > self.ttl
]
for sid in expired:
self.sessions.pop(sid, None)
self.timestamps.pop(sid, None)
3. 网络优化
SSE连接管理
# 前端SSE重连机制
class SSEManager {
constructor(url, options) {
this.url = url;
this.options = options;
this.controller = new AbortController();
this.retryCount = 0;
this.maxRetries = 5;
}
async connect() {
try {
await fetchEventSource(this.url, {
...this.options,
signal: this.controller.signal,
onerror: (err) => {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
setTimeout(() => this.connect(), 1000 * this.retryCount);
}
}
});
} catch (err) {
console.error('SSE connection failed:', err);
}
}
}
数据压缩
# 启用Gzip压缩
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
4. 缓存策略
搜索结果缓存
class SearchCache:
def __init__(self, max_size=1000, ttl=1800):
self.cache = {}
self.timestamps = {}
self.max_size = max_size
self.ttl = ttl
def get(self, query: str):
if query in self.cache:
if time.time() - self.timestamps[query] < self.ttl:
return self.cache[query]
else:
self.remove(query)
return None
def put(self, query: str, result):
if len(self.cache) >= self.max_size:
# LRU淘汰
oldest = min(self.timestamps, key=self.timestamps.get)
self.remove(oldest)
self.cache[query] = result
self.timestamps[query] = time.time()
扩展性设计
1. 插件化架构
搜索引擎插件
# 搜索引擎基类
class BaseSearchEngine(ABC):
@abstractmethod
async def search(self, query: str, topk: int = 10) -> List[Dict]:
pass
@abstractmethod
async def select(self, indices: List[int]) -> List[Dict]:
pass
# 具体实现
class BingSearchEngine(BaseSearchEngine):
async def search(self, query: str, topk: int = 10):
# Bing搜索实现
pass
class GoogleSearchEngine(BaseSearchEngine):
async def search(self, query: str, topk: int = 10):
# Google搜索实现
pass
LLM插件
# LLM基类
class BaseLLM(ABC):
@abstractmethod
async def stream_chat(self, messages: List[Dict], **kwargs):
pass
@abstractmethod
def parse_response(self, response: str) -> Dict:
pass
# 具体实现
class InternLMLLM(BaseLLM):
async def stream_chat(self, messages: List[Dict], **kwargs):
# InternLM实现
pass
class GPTLLM(BaseLLM):
async def stream_chat(self, messages: List[Dict], **kwargs):
# GPT实现
pass
2. 配置驱动开发
模型配置
# models.py
internlm_server = dict(
type=LMDeployServer,
path="internlm/internlm2_5-7b-chat",
model_name="internlm2_5-7b-chat",
meta_template=INTERNLM2_META,
top_p=0.8,
top_k=1,
temperature=0,
max_new_tokens=8192,
repetition_penalty=1.02,
stop_words=["<|im_end|>"],
)
gpt4 = dict(
type=GPTAPI,
model_type="gpt-4-turbo",
key=os.environ.get("OPENAI_API_KEY"),
api_base=os.environ.get("OPENAI_API_BASE"),
)
Agent配置
# __init__.py
def init_agent(lang="cn", model_format="internlm_server", search_engine="BingSearch", use_async=False):
# 动态创建LLM实例
llm_cfg = deepcopy(getattr(llm_factory, model_format))
if use_async:
cls_name = llm_cfg["type"].split(".")[-1]
llm_cfg["type"] = f"lagent.llms.Async{cls_name}"
llm = create_object(llm_cfg)
# 动态创建搜索引擎
plugins = [dict(
type=AsyncWebBrowser if use_async else WebBrowser,
searcher_type=search_engine,
topk=6,
api_key=os.getenv("WEB_SEARCH_API_KEY"),
)]
# 创建Agent
agent = (AsyncMindSearchAgent if use_async else MindSearchAgent)(
llm=llm,
searcher_cfg=dict(
llm=llm,
plugins=plugins,
# ...其他配置
),
summary_prompt=FINAL_RESPONSE_CN if lang == "cn" else FINAL_RESPONSE_EN,
max_turn=10,
)
return agent
3. 模块化设计
前端模块化
// 组件结构
src/
├── components/
│ ├── answer/ # 答案展示组件
│ ├── chat-right/ # 右侧聊天面板
│ ├── mind-map/ # 思维导图
│ ├── loading/ # 加载动画
│ └── ...
├── provider/ # 状态管理
├── utils/ # 工具函数
└── pages/
└── mindsearch/ # 主页面
// 独立组件示例
interface INodeComponentProps {
nodeInfo: INodeInfo;
onClick: (node: string) => void;
}
const NodeComponent: React.FC<INodeComponentProps> = ({ nodeInfo, onClick }) => {
// 组件实现
};
后端模块化
# 清晰的模块划分
mindsearch/
├── __init__.py
├── app.py # FastAPI服务
├── terminal.py # 终端界面
└── agent/
├── __init__.py # Agent工厂
├── graph.py # 图搜索实现
├── mindsearch_agent.py # 主Agent
├── models.py # 模型配置
├── streaming.py # 流式处理
└── mindsearch_prompt.py # 提示词模板
4. 测试和调试支持
单元测试
# 测试WebSearchGraph
def test_web_search_graph():
graph = WebSearchGraph()
graph.add_root_node("测试问题")
graph.add_node("子问题1", "子问题内容1")
graph.add_node("子问题2", "子问题内容2")
graph.add_edge("root", "子问题1")
graph.add_edge("root", "子问题2")
assert "root" in graph.nodes
assert "子问题1" in graph.nodes
assert len(graph.adjacency_list["root"]) == 2
# 测试Agent
def test_mind_search_agent():
agent = init_agent(lang="cn", model_format="internlm_server")
response = list(agent("测试问题"))
assert len(response) > 0
assert response[-1].stream_state == AgentStatusCode.END
调试工具
# 终端调试
python -m mindsearch.terminal
# 日志配置
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mindsearch.log'),
logging.StreamHandler()
]
)
# 性能分析
from functools import wraps
import time
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logging.info(f"{func.__name__} took {end - start:.2f} seconds")
return result
return wrapper
系统架构图
1. 整体架构图
2. 数据流图
3. 并发处理架构
4. 状态转换图
总结
MindSearch 的架构设计体现了以下核心优势:
1. 模块化设计
- 清晰的层次划分,各模块职责明确
- 插件化架构,易于扩展新的模型和搜索引擎
- 配置驱动,灵活适应不同场景
2. 高性能并发
- 线程池和事件循环的混合使用
- 动态并发控制,资源利用率高
- 异步IO优化,减少等待时间
3. 智能任务分解
- 基于图的动态任务分解
- 并行搜索,大幅缩短响应时间
- 智能结果合并和引用管理
4. 优秀的用户体验
- SSE流式响应,实时展示思考过程
- 透明的解决方案路径
- 丰富的交互界面选择
5. 强大的可扩展性
- 支持多种LLM和搜索引擎
- 灵活的配置系统
- 完善的测试和调试支持
这种架构设计使 MindSearch 能够在保证回答质量的同时,提供快速的响应速度和优秀的用户体验,为AI搜索引擎的发展提供了优秀的参考实现。