目录
1.
项目概述
2.
系统整体架构
3.
核心组件架构
4.
技术实现细节
5.
性能优化策略
6.
扩展性设计
7.
系统架构图项目概述
背景与目标
MindSearch 是一个开源的 AI 搜索引擎框架,旨在模仿人类思维过程进行深度信息检索。项目的核心目标是构建一个能够理解复杂问题、自动分解任务、并行搜索信息并生成高质量回答的智能系统。
核心特性
- 深度知识探索:通过搜索数百个网页,提供广泛且深层次的答案
- 透明的解决方案路径:提供完整的思考路径、搜索关键词等信息
- 动态图构建过程:将用户查询分解为图中的子问题节点,逐步扩展
- 多种用户界面:支持 React、Gradio、Streamlit 等多种前端界面
- 多模型支持:兼容 InternLM、GPT-4、Qwen 等多种大语言模型
- 多搜索引擎集成:支持 DuckDuckGo、Bing、Google、Brave 等搜索引擎
系统整体架构
分层架构设计
MindSearch 采用清晰的分层架构,各层职责明确:
graph TB
subgraph 用户界面层
ReactUI[React 前端]
GradioUI[Gradio 界面]
StreamlitUI[Streamlit 界面]
TerminalUI[终端界面]
end
subgraph API 网关层
FastAPI[FastAPI 服务]
CORS[CORS 中间件]
SSE[SSE 流式响应]
end
subgraph 核心逻辑层
MindSearchAgent[MindSearchAgent]
AsyncMindSearchAgent[AsyncMindSearchAgent]
WebSearchGraph[WebSearchGraph]
SearcherAgent[SearcherAgent]
end
subgraph 基础设施层
LLM[大语言模型]
SearchEngine[搜索引擎]
Memory[记忆管理]
Streaming[流式处理]
end
ReactUI --> FastAPI
GradioUI --> FastAPI
StreamlitUI --> FastAPI
TerminalUI --> MindSearchAgent
FastAPI --> MindSearchAgent
FastAPI --> AsyncMindSearchAgent
MindSearchAgent --> WebSearchGraph
AsyncMindSearchAgent --> WebSearchGraph
WebSearchGraph --> SearcherAgent
SearcherAgent --> LLM
SearcherAgent --> SearchEngine
MindSearchAgent --> LLM
LLM --> Memory
SearcherAgent --> Memory
核心工作流程
sequenceDiagram
participant User as 用户
participant Frontend as 前端界面
participant FastAPI as FastAPI服务
participant Agent as MindSearchAgent
participant Graph as WebSearchGraph
participant Searcher as SearcherAgent
participant LLM as 大语言模型
participant Search as 搜索引擎
User->>Frontend: 输入查询问题
Frontend->>FastAPI: POST /solve (SSE)
FastAPI->>Agent: 初始化Agent
Agent->>LLM: 生成图构建代码
LLM-->>Agent: 返回Python代码
loop 对每个子问题
Agent->>Graph: 添加节点
Graph->>Searcher: 创建搜索任务
Searcher->>LLM: 生成搜索关键词
LLM-->>Searcher: 返回关键词
Searcher->>Search: 执行搜索
Search-->>Searcher: 返回搜索结果
Searcher->>LLM: 分析搜索结果
LLM-->>Searcher: 返回分析结果
Searcher-->>Graph: 存储节点结果
Graph-->>Agent: 返回图状态
Agent->>Frontend: SSE流式更新
end
Agent->>LLM: 生成最终回答
LLM-->>Agent: 返回总结
Agent->>Frontend: 最终响应
Frontend->>User: 展示完整答案
核心组件架构
1. MindSearchAgent 架构
MindSearchAgent 是整个系统的核心协调器,负责管理搜索图的构建和最终答案的生成。
#### 类结构
class MindSearchAgent(StreamingAgentForInternLM):
def __init__(
self,
searcher_cfg: dict, # 搜索器配置
summary_prompt: str, # 总结提示词
finish_condition=lambda m: "add_response_node" in m.content,
max_turn: int = 10, # 最大轮次
**kwargs
):
# 配置WebSearchGraph
WebSearchGraph.SEARCHER_CONFIG = searcher_cfg
super().__init__(finish_condition=finish_condition, max_turn=max_turn, **kwargs)
self.summary_prompt = summary_prompt
self.action = ExecutionAction() # 代码执行器
def forward(self, message: AgentMessage, session_id=0, **kwargs):
# 主循环,最多max_turn次
for _ in range(self.max_turn):
# 1. 调用LLM生成图构建代码
# 2. 执行生成的代码
# 3. 更新图状态
# 4. 检查是否完成
# 5. 生成最终回答或继续下一轮
#### 核心工作流程
1. 初始化阶段:接收用户问题,初始化搜索图
2. 图构建阶段:调用LLM生成Python代码,动态构建搜索图
3. 并发搜索阶段:并行执行多个搜索任务
4. 结果收集阶段:收集所有搜索结果,更新图状态
5. 最终生成阶段:基于收集的信息生成完整回答
2. WebSearchGraph 架构
WebSearchGraph 是系统的核心数据结构,实现了动态图构建和并发搜索管理。
#### 数据结构
class WebSearchGraph:
def __init__(self):
self.nodes: Dict[str, Dict[str, str]] = {} # 节点存储
self.adjacency_list: Dict[str, List[dict]] = defaultdict(list) # 邻接表
self.future_to_query = dict() # 任务映射
self.searcher_resp_queue = queue.Queue() # 响应队列
self.executor = ThreadPoolExecutor(max_workers=10) # 线程池
self.n_active_tasks = 0 # 活跃任务数
#### 核心方法
1. add_root_node:添加根节点(用户问题)
2. add_node:添加搜索子问题节点
3. add_response_node:添加响应节点
4. add_edge:添加节点间的边
5. node:获取节点信息
#### 并发处理机制
def add_node(self, node_name: str, node_content: str):
# 1. 添加节点到图
self.nodes[node_name] = dict(content=node_content, type="searcher")
# 2. 查找父节点
parent_nodes = self._find_parent_nodes(node_name)
# 3. 创建搜索任务
if self.is_async:
# 异步模式:使用事件循环
asyncio.run_coroutine_threadsafe(
_async_search_node_stream(),
random.choice(self._SEARCHER_LOOP)
)
else:
# 同步模式:使用线程池
self.executor.submit(_search_node_stream)
self.n_active_tasks += 1
3. SearcherAgent 架构
SearcherAgent 负责执行具体的搜索任务,包括关键词生成、搜索执行和结果分析。
#### 类结构
class SearcherAgent(StreamingAgentForInternLM):
def __init__(
self,
user_input_template: str = "{question}",
user_context_template: str = None,
**kwargs,
):
self.user_input_template = user_input_template
self.user_context_template = user_context_template
super().__init__(**kwargs)
def forward(
self,
question: str,
topic: str,
history: List[dict] = None,
session_id=0,
**kwargs,
):
# 1. 构建输入消息
message = [self.user_input_template.format(question=question, topic=topic)]
if history and self.user_context_template:
message = [self.user_context_template.format_map(item) for item in history] + message
message = "\n".join(message)
# 2. 调用LLM进行搜索和分析
return super().forward(message, session_id=session_id, **kwargs)
#### 搜索流程
1. 关键词生成:LLM根据问题生成搜索关键词
2. 搜索引擎调用:调用配置的搜索引擎API
3. 结果选择:LLM选择最相关的搜索结果
4. 内容分析:分析选中的网页内容
5. 答案生成:基于分析结果生成回答
4. 流式响应架构
#### SSE 实现机制
# FastAPI 端点
@app.post("/solve")
async def run(request: GenerationParams, _request: Request):
async def generate():
# 1. 初始化Agent
agent = init_agent(...)
# 2. 创建队列和事件
queue = janus.Queue()
stop_event = asyncio.Event()
# 3. 包装同步生成器为异步
def sync_generator_wrapper():
for response in agent(inputs, session_id=session_id):
queue.sync_q.put(response)
queue.sync_q.put(None)
async def async_generator_wrapper():
loop.run_in_executor(None, sync_generator_wrapper)
while True:
response = await queue.async_q.get()
if response is None:
break
yield response
# 4. 流式发送响应
async for message in async_generator_wrapper():
response_json = json.dumps(
_postprocess_agent_message(message.model_dump()),
ensure_ascii=False,
)
yield {"data": response_json}
#### 状态管理
系统使用 AgentStatusCode 管理响应状态:
SESSION_READY:会话准备就绪
STREAM_ING:流式响应中
PLUGIN_START:插件调用开始
CODING:代码生成中
END:响应结束
技术实现细节
1. 引用管理系统
#### 引用生成流程
def _generate_references_from_graph(graph: Dict[str, dict]) -> Tuple[str, Dict[int, dict]]:
ptr, references, references_url = 0, [], {}
for name, data_item in graph.items():
if name in ["root", "response"]:
continue
# 提取引用信息
ref2url = {
int(k): v for k, v in json.loads(
data_item["memory"]["agent.memory"][2]["content"]
).items()
}
# 更新引用索引
updata_ref, ref2url, added_ptr = _update_ref(
data_item["response"]["content"], ref2url, ptr
)
ptr += added_ptr
# 构建引用文本
references.append(f'## {data_item["content"]}\n\n{updata_ref}')
references_url.update(ref2url)
return "\n\n".join(references), references_url
#### 引用更新机制
def _update_ref(ref: str, ref2url: Dict[str, str], ptr: int) -> str:
# 提取所有引用编号
numbers = list({int(n) for n in re.findall(r"\[\[(\d+)\]\]", ref)})
numbers = {n: idx + 1 for idx, n in enumerate(numbers)}
# 更新引用索引
updated_ref = re.sub(
r"\[\[(\d+)\]\]",
lambda match: f"[[{numbers[int(match.group(1))] + ptr}]]",
ref,
)
# 更新URL映射
updated_ref2url = {
numbers[idx] + ptr: ref2url[idx]
for idx in numbers if idx in ref2url
}
return updated_ref, updated_ref2url, len(numbers) + 1
2. 记忆管理机制
#### 会话记忆结构
# Agent记忆结构
memory = {
session_id: [
AgentMessage(...), # 用户消息
AgentMessage(...), # 助手消息
AgentMessage(...), # 工具调用结果
...
]
}
# WebSearchGraph节点记忆
node_memory = {
"content": "问题内容",
"type": "searcher",
"response": AgentMessage(...),
"memory": {
"agent.memory": [
AgentMessage(...), # 系统提示
AgentMessage(...), # 用户输入
AgentMessage(...), # 搜索结果
AgentMessage(...), # 分析结果
]
},
"session_id": 12345
}
#### 记忆清理策略
# 会话结束时清理记忆
finally:
await stop_event.wait()
queue.close()
await queue.wait_closed()
agent.agent.memory.memory_map.pop(session_id, None) # 清理会话记忆
3. 并发控制机制
#### 线程池管理
class WebSearchGraph:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10) # 限制并发数
self.n_active_tasks = 0 # 追踪活跃任务
def add_node(self, node_name: str, node_content: str):
# 提交任务到线程池
self.future_to_query[
self.executor.submit(_search_node_stream)
] = f"{node_name}-{node_content}"
self.n_active_tasks += 1
#### 异步事件循环管理
class WebSearchGraph:
_SEARCHER_LOOP = [] # 事件循环列表
_SEARCHER_THREAD = [] # 线程列表
@classmethod
def start_loop(cls, n: int = 32):
"""启动多个事件循环"""
while len(cls._SEARCHER_THREAD) < n:
def _start_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
cls._SEARCHER_LOOP.append(loop)
loop.run_forever()
thread = Thread(target=_start_loop, daemon=True)
thread.start()
cls._SEARCHER_THREAD.append(thread)
4. 错误处理机制
#### 异常捕获与恢复
async def generate():
try:
async for message in async_generator_wrapper():
response_json = json.dumps(
_postprocess_agent_message(message.model_dump()),
ensure_ascii=False,
)
yield {"data": response_json}
except Exception as exc:
msg = "An error occurred while generating the response."
logging.exception(msg)
response_json = json.dumps(
dict(error=dict(msg=msg, details=str(exc))),
ensure_ascii=False
)
yield {"data": response_json}
#### 任务失败处理
def _search_node_stream():
try:
for searcher_message in agent(...):
self.nodes[node_name]["response"] = searcher_message.model_dump()
self.searcher_resp_queue.put((node_name, self.nodes[node_name], []))
self.searcher_resp_queue.put((None, None, None))
except Exception as exc:
self.searcher_resp_queue.put((exc, None, None)) # 传递异常
性能优化策略
1. 并发优化
#### 动态并发控制
# 根据系统负载动态调整并发数
class DynamicThreadPool:
def __init__(self, min_workers=5, max_workers=50):
self.min_workers = min_workers
self.max_workers = max_workers
self.current_workers = min_workers
self.executor = ThreadPoolExecutor(max_workers=self.current_workers)
def adjust_pool_size(self, queue_size):
"""根据队列大小调整线程池"""
if queue_size > 100 and self.current_workers < self.max_workers:
self.current_workers = min(self.current_workers + 5, self.max_workers)
self.executor._max_workers = self.current_workers
elif queue_size < 10 and self.current_workers > self.min_workers:
self.current_workers = max(self.current_workers - 5, self.min_workers)
self.executor._max_workers = self.current_workers
#### 异步IO优化
# 使用aiohttp替代requests进行异步HTTP请求
import aiohttp
class AsyncWebBrowser:
async def search(self, query: str):
async with aiohttp.ClientSession() as session:
async with session.get(
self.search_url,
params={"q": query},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
return await response.json()
2. 内存优化
#### 消息过滤与压缩
def _postprocess_agent_message(message: dict) -> dict:
"""过滤不必要的信息,减少传输数据量"""
content, fmt = message["content"], message["formatted"]
current_node = content["current_node"] if isinstance(content, dict) else None
if current_node:
# 只返回当前节点的信息
message["content"] = None
for key in ["ref2url"]:
fmt.pop(key, None)
graph = fmt["node"]
for key in graph.copy():
if key != current_node:
graph.pop(key) # 删除非当前节点
return dict(current_node=current_node, response=message)
#### 记忆清理策略
# 定期清理过期会话
class MemoryManager:
def __init__(self, max_sessions=1000, ttl=3600):
self.max_sessions = max_sessions
self.ttl = ttl
self.sessions = {}
self.timestamps = {}
def cleanup(self):
"""清理过期会话"""
current_time = time.time()
expired = [
sid for sid, ts in self.timestamps.items()
if current_time - ts > self.ttl
]
for sid in expired:
self.sessions.pop(sid, None)
self.timestamps.pop(sid, None)
3. 网络优化
#### SSE连接管理
# 前端SSE重连机制
class SSEManager {
constructor(url, options) {
this.url = url;
this.options = options;
this.controller = new AbortController();
this.retryCount = 0;
this.maxRetries = 5;
}
async connect() {
try {
await fetchEventSource(this.url, {
...this.options,
signal: this.controller.signal,
onerror: (err) => {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
setTimeout(() => this.connect(), 1000 * this.retryCount);
}
}
});
} catch (err) {
console.error('SSE connection failed:', err);
}
}
}
#### 数据压缩
# 启用Gzip压缩
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
4. 缓存策略
#### 搜索结果缓存
class SearchCache:
def __init__(self, max_size=1000, ttl=1800):
self.cache = {}
self.timestamps = {}
self.max_size = max_size
self.ttl = ttl
def get(self, query: str):
if query in self.cache:
if time.time() - self.timestamps[query] < self.ttl:
return self.cache[query]
else:
self.remove(query)
return None
def put(self, query: str, result):
if len(self.cache) >= self.max_size:
# LRU淘汰
oldest = min(self.timestamps, key=self.timestamps.get)
self.remove(oldest)
self.cache[query] = result
self.timestamps[query] = time.time()
扩展性设计
1. 插件化架构
#### 搜索引擎插件
# 搜索引擎基类
class BaseSearchEngine(ABC):
@abstractmethod
async def search(self, query: str, topk: int = 10) -> List[Dict]:
pass
@abstractmethod
async def select(self, indices: List[int]) -> List[Dict]:
pass
# 具体实现
class BingSearchEngine(BaseSearchEngine):
async def search(self, query: str, topk: int = 10):
# Bing搜索实现
pass
class GoogleSearchEngine(BaseSearchEngine):
async def search(self, query: str, topk: int = 10):
# Google搜索实现
pass
#### LLM插件
# LLM基类
class BaseLLM(ABC):
@abstractmethod
async def stream_chat(self, messages: List[Dict], **kwargs):
pass
@abstractmethod
def parse_response(self, response: str) -> Dict:
pass
# 具体实现
class InternLMLLM(BaseLLM):
async def stream_chat(self, messages: List[Dict], **kwargs):
# InternLM实现
pass
class GPTLLM(BaseLLM):
async def stream_chat(self, messages: List[Dict], **kwargs):
# GPT实现
pass
2. 配置驱动开发
#### 模型配置
# models.py
internlm_server = dict(
type=LMDeployServer,
path="internlm/internlm2_5-7b-chat",
model_name="internlm2_5-7b-chat",
meta_template=INTERNLM2_META,
top_p=0.8,
top_k=1,
temperature=0,
max_new_tokens=8192,
repetition_penalty=1.02,
stop_words=["<|im_end|>"],
)
gpt4 = dict(
type=GPTAPI,
model_type="gpt-4-turbo",
key=os.environ.get("OPENAI_API_KEY"),
api_base=os.environ.get("OPENAI_API_BASE"),
)
#### Agent配置
# __init__.py
def init_agent(lang="cn", model_format="internlm_server", search_engine="BingSearch", use_async=False):
# 动态创建LLM实例
llm_cfg = deepcopy(getattr(llm_factory, model_format))
if use_async:
cls_name = llm_cfg["type"].split(".")[-1]
llm_cfg["type"] = f"lagent.llms.Async{cls_name}"
llm = create_object(llm_cfg)
# 动态创建搜索引擎
plugins = [dict(
type=AsyncWebBrowser if use_async else WebBrowser,
searcher_type=search_engine,
topk=6,
api_key=os.getenv("WEB_SEARCH_API_KEY"),
)]
# 创建Agent
agent = (AsyncMindSearchAgent if use_async else MindSearchAgent)(
llm=llm,
searcher_cfg=dict(
llm=llm,
plugins=plugins,
# ...其他配置
),
summary_prompt=FINAL_RESPONSE_CN if lang == "cn" else FINAL_RESPONSE_EN,
max_turn=10,
)
return agent
3. 模块化设计
#### 前端模块化
// 组件结构
src/
├── components/
│ ├── answer/ # 答案展示组件
│ ├── chat-right/ # 右侧聊天面板
│ ├── mind-map/ # 思维导图
│ ├── loading/ # 加载动画
│ └── ...
├── provider/ # 状态管理
├── utils/ # 工具函数
└── pages/
└── mindsearch/ # 主页面
// 独立组件示例
interface INodeComponentProps {
nodeInfo: INodeInfo;
onClick: (node: string) => void;
}
const NodeComponent: React.FC<INodeComponentProps> = ({ nodeInfo, onClick }) => {
// 组件实现
};
#### 后端模块化
# 清晰的模块划分
mindsearch/
├── __init__.py
├── app.py # FastAPI服务
├── terminal.py # 终端界面
└── agent/
├── __init__.py # Agent工厂
├── graph.py # 图搜索实现
├── mindsearch_agent.py # 主Agent
├── models.py # 模型配置
├── streaming.py # 流式处理
└── mindsearch_prompt.py # 提示词模板
4. 测试和调试支持
#### 单元测试
# 测试WebSearchGraph
def test_web_search_graph():
graph = WebSearchGraph()
graph.add_root_node("测试问题")
graph.add_node("子问题1", "子问题内容1")
graph.add_node("子问题2", "子问题内容2")
graph.add_edge("root", "子问题1")
graph.add_edge("root", "子问题2")
assert "root" in graph.nodes
assert "子问题1" in graph.nodes
assert len(graph.adjacency_list["root"]) == 2
# 测试Agent
def test_mind_search_agent():
agent = init_agent(lang="cn", model_format="internlm_server")
response = list(agent("测试问题"))
assert len(response) > 0
assert response[-1].stream_state == AgentStatusCode.END
#### 调试工具
# 终端调试
python -m mindsearch.terminal
# 日志配置
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mindsearch.log'),
logging.StreamHandler()
]
)
# 性能分析
from functools import wraps
import time
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logging.info(f"{func.__name__} took {end - start:.2f} seconds")
return result
return wrapper
系统架构图
1. 整体架构图
graph TB
subgraph 客户端层
User[用户]
Browser[浏览器]
Mobile[移动设备]
Terminal[终端]
end
subgraph 接口层
ReactUI[React前端]
GradioUI[Gradio界面]
StreamlitUI[Streamlit界面]
FastAPI[FastAPI网关]
end
subgraph 核心服务层
MindSearchAgent[MindSearchAgent<br/>主协调器]
AsyncMindSearchAgent[AsyncMindSearchAgent<br/>异步协调器]
WebSearchGraph[WebSearchGraph<br/>图管理器]
SearcherAgent[SearcherAgent<br/>搜索执行器]
end
subgraph 工具层
WebBrowser[WebBrowser<br/>网页浏览器]
AsyncWebBrowser[AsyncWebBrowser<br/>异步浏览器]
ExecutionAction[ExecutionAction<br/>代码执行器]
end
subgraph 模型层
InternLM[InternLM2.5-7B]
GPT4[GPT-4]
Qwen[Qwen-Max]
Claude[Claude]
end
subgraph 搜索层
Bing[Bing搜索]
Google[Google搜索]
DuckDuckGo[DuckDuckGo]
Brave[Brave搜索]
Tencent[腾讯搜索]
end
User --> Browser
User --> Mobile
User --> Terminal
Browser --> ReactUI
Mobile --> ReactUI
Terminal --> MindSearchAgent
ReactUI --> FastAPI
GradioUI --> FastAPI
StreamlitUI --> FastAPI
FastAPI --> MindSearchAgent
FastAPI --> AsyncMindSearchAgent
MindSearchAgent --> WebSearchGraph
AsyncMindSearchAgent --> WebSearchGraph
WebSearchGraph --> SearcherAgent
SearcherAgent --> WebBrowser
SearcherAgent --> AsyncWebBrowser
MindSearchAgent --> ExecutionAction
WebBrowser --> Bing
WebBrowser --> Google
WebBrowser --> DuckDuckGo
WebBrowser --> Brave
WebBrowser --> Tencent
AsyncWebBrowser --> Bing
AsyncWebBrowser --> Google
AsyncWebBrowser --> DuckDuckGo
AsyncWebBrowser --> Brave
AsyncWebBrowser --> Tencent
MindSearchAgent --> InternLM
MindSearchAgent --> GPT4
MindSearchAgent --> Qwen
MindSearchAgent --> Claude
SearcherAgent --> InternLM
SearcherAgent --> GPT4
SearcherAgent --> Qwen
SearcherAgent --> Claude
2. 数据流图
graph LR
subgraph 输入阶段
UserQuery[用户查询]
QueryAnalysis[查询分析]
GraphConstruction[图构建]
end
subgraph 处理阶段
NodeGeneration[节点生成]
ConcurrentSearch[并发搜索]
ResultAnalysis[结果分析]
GraphUpdate[图更新]
end
subgraph 输出阶段
AnswerGeneration[答案生成]
ReferenceManagement[引用管理]
StreamResponse[流式响应]
UserDisplay[用户展示]
end
UserQuery --> QueryAnalysis
QueryAnalysis --> GraphConstruction
GraphConstruction --> NodeGeneration
NodeGeneration --> ConcurrentSearch
ConcurrentSearch --> ResultAnalysis
ResultAnalysis --> GraphUpdate
GraphUpdate --> AnswerGeneration
AnswerGeneration --> ReferenceManagement
ReferenceManagement --> StreamResponse
StreamResponse --> UserDisplay
style UserQuery fill:#e1f5ff
style UserDisplay fill:#e8f5e9
3. 并发处理架构
graph TB
subgraph 主线程
MainThread[主线程<br/>MindSearchAgent]
GraphBuilder[图构建器]
ResponseCollector[响应收集器]
end
subgraph 工作线程池
Thread1[工作线程1<br/>SearcherAgent]
Thread2[工作线程2<br/>SearcherAgent]
Thread3[工作线程3<br/>SearcherAgent]
ThreadN[工作线程N<br/>SearcherAgent]
end
subgraph 异步事件循环
EventLoop1[事件循环1]
EventLoop2[事件循环2]
EventLoop3[事件循环3]
EventLoopM[事件循环M]
end
subgraph 外部服务
LLM1[LLM服务]
LLM2[LLM服务]
Search1[搜索引擎]
Search2[搜索引擎]
end
MainThread --> GraphBuilder
GraphBuilder --> ResponseCollector
MainThread --> Thread1
MainThread --> Thread2
MainThread --> Thread3
MainThread --> ThreadN
Thread1 --> EventLoop1
Thread2 --> EventLoop2
Thread3 --> EventLoop3
ThreadN --> EventLoopM
EventLoop1 --> LLM1
EventLoop1 --> Search1
EventLoop2 --> LLM1
EventLoop2 --> Search2
EventLoop3 --> LLM2
EventLoop3 --> Search1
EventLoopM --> LLM2
EventLoopM --> Search2
Thread1 --> ResponseCollector
Thread2 --> ResponseCollector
Thread3 --> ResponseCollector
ThreadN --> ResponseCollector
ResponseCollector --> MainThread
4. 状态转换图
stateDiagram-v2
[*] --> SESSION_READY: 初始化
SESSION_READY --> STREAM_ING: 开始生成
STREAM_ING --> PLUGIN_START: 调用插件
STREAM_ING --> CODING: 生成代码
STREAM_ING --> END: 生成完成
PLUGIN_START --> STREAM_ING: 插件返回
CODING --> STREAM_ING: 代码执行完成
END --> [*]: 会话结束
note right of SESSION_READY: "会话准备就绪"
note right of STREAM_ING: "流式响应中"
note right of PLUGIN_START: "插件调用开始"
note right of CODING: "代码生成中"
note right of END: "响应结束"
总结
MindSearch 的架构设计体现了以下核心优势:
1. 模块化设计
- 清晰的层次划分,各模块职责明确
- 插件化架构,易于扩展新的模型和搜索引擎
- 配置驱动,灵活适应不同场景
2. 高性能并发
- 线程池和事件循环的混合使用
- 动态并发控制,资源利用率高
- 异步IO优化,减少等待时间
3. 智能任务分解
- 基于图的动态任务分解
- 并行搜索,大幅缩短响应时间
- 智能结果合并和引用管理
4. 优秀的用户体验
- SSE流式响应,实时展示思考过程
- 透明的解决方案路径
- 丰富的交互界面选择
5. 强大的可扩展性
- 支持多种LLM和搜索引擎
- 灵活的配置系统
- 完善的测试和调试支持
这种架构设计使 MindSearch 能够在保证回答质量的同时,提供快速的响应速度和优秀的用户体验,为AI搜索引擎的发展提供了优秀的参考实现。