MongoDB作为Agno项目的核心数据库支持,为AI智能体、团队和工作流提供了强大的数据持久化能力。本文将深入探讨MongoDB在Agno架构中的实现原理、功能特性以及实际应用场景。
Agno为MongoDB提供了两套完整的实现方案:
/Volumes/SSD/GitHub/agno/libs/agno/agno/db/mongo/mongo.py (1993行)/Volumes/SSD/GitHub/agno/libs/agno/agno/db/mongo/async_mongo.py (2036行)def _ensure_client(self) -> AsyncIOMotorClient:
"""
确保Motor客户端对当前事件循环有效
Motor的AsyncIOMotorClient与创建它的事件循环绑定。
如果检测到新的事件循环,需要刷新客户端。
"""
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
# 无运行中的循环,返回现有客户端或创建新客户端
if self._client is None:
if self._provided_client is not None:
self._client = self._provided_client
elif self.db_url is not None:
self._client = AsyncIOMotorClient(self.db_url)
log_debug("在事件循环外部创建AsyncIOMotorClient")
return self._client
# 检查是否在不同的事件循环中
if self._event_loop is None or self._event_loop is not current_loop:
# 检测到新事件循环,创建新客户端
if self._provided_client is not None:
log_debug(
"检测到新事件循环。使用提供的AsyncIOMotorClient,"
"如果它是在不同事件循环中创建的,可能会出现问题。"
)
self._client = self._provided_client
elif self.db_url is not None:
old_loop_id = id(self._event_loop) if self._event_loop else "None"
new_loop_id = id(current_loop)
log_debug(f"事件循环从 {old_loop_id} 变为 {new_loop_id},创建新的AsyncIOMotorClient")
self._client = AsyncIOMotorClient(self.db_url)
self._event_loop = current_loop
self._database = None # 重置数据库引用
# 切换事件循环时清除集合缓存和初始化标志
for attr in list(vars(self).keys()):
if attr.endswith("_collection") or attr.endswith("_initialized"):
delattr(self, attr)
return self._client
@property
def database(self) -> Database:
if self._database is None:
self._database = self.db_client[self.db_name]
return self._database
Agno的MongoDB集成定义了6个核心集合,每个都有其专门的用途和索引策略:
SESSION_COLLECTION_SCHEMA = [
{"key": "session_id", "unique": True},
{"key": "user_id"},
{"key": "session_type"},
{"key": "agent_id"},
{"key": "team_id"},
{"key": "workflow_id"},
{"key": "created_at"},
{"key": "updated_at"},
]
MEMORY_COLLECTION_SCHEMA = [
{"key": "memory_id", "unique": True},
{"key": "user_id"},
{"key": "agent_id"},
{"key": "team_id"},
{"key": "topics"},
{"key": "input"},
{"key": "feedback"},
{"key": "created_at"},
{"key": "updated_at"},
]
METRICS_COLLECTION_SCHEMA = [
{"key": "id", "unique": True},
{"key": "date"},
{"key": "aggregation_period"},
{"key": "created_at"},
{"key": "updated_at"},
{"key": [("date", 1), ("aggregation_period", 1)], "unique": True},
]
EVAL_COLLECTION_SCHEMA = [
{"key": "run_id", "unique": True},
{"key": "eval_type"},
{"key": "eval_input"},
{"key": "agent_id"},
{"key": "team_id"},
{"key": "workflow_id"},
{"key": "model_id"},
{"key": "created_at"},
{"key": "updated_at"},
]
KNOWLEDGE_COLLECTION_SCHEMA = [
{"key": "id", "unique": True},
{"key": "name"},
{"key": "description"},
{"key": "type"},
{"key": "status"},
{"key": "status_message"},
{"key": "metadata"},
{"key": "size"},
{"key": "linked_to"},
{"key": "access_count"},
{"key": "created_at"},
{"key": "updated_at"},
{"key": "external_id"},
]
CULTURAL_KNOWLEDGE_COLLECTION_SCHEMA = [
{"key": "id", "unique": True},
{"key": "name"},
{"key": "agent_id"},
{"key": "team_id"},
{"key": "created_at"},
{"key": "updated_at"},
]
Agno实现了自动索引创建机制,确保数据库性能:
def create_collection_indexes(collection: Collection, collection_type: str) -> None:
"""为集合创建所有必需的索引"""
try:
indexes = get_collection_indexes(collection_type)
for index_spec in indexes:
key = index_spec["key"]
unique = index_spec.get("unique", False)
if isinstance(key, list):
collection.create_index(key, unique=unique)
else:
collection.create_index([(key, 1)], unique=unique)
except Exception as e:
log_warning(f"为{collection_type}集合创建索引时出错: {e}")
def apply_sorting(
query_args: Dict[str, Any], sort_by: Optional[str] = None, sort_order: Optional[str] = None
) -> List[tuple]:
"""为MongoDB查询应用排序"""
if sort_by is None:
return []
sort_direction = 1 if sort_order == "asc" else -1
return [(sort_by, sort_direction)]
def apply_pagination(
query_args: Dict[str, Any], limit: Optional[int] = None, page: Optional[int] = None
) -> Dict[str, Any]:
"""为MongoDB查询应用分页"""
if limit is not None:
query_args["limit"] = limit
if page is not None:
query_args["skip"] = (page - 1) * limit
return query_args
def get_user_memory_stats(
self,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
team_id: Optional[str] = None,
limit: Optional[int] = None,
page: Optional[int] = None,
) -> Tuple[List[Dict[str, Any]], int]:
"""使用聚合管道获取用户内存统计"""
try:
collection = self._get_collection(table_type="memories")
if collection is None:
return [], 0
match_stage = {"user_id": {"$ne": None}}
pipeline = [
{"$match": match_stage},
{
"$group": {
"_id": "$user_id",
"total_memories": {"$sum": 1},
"last_memory_updated_at": {"$max": "$updated_at"},
}
},
{"$sort": {"last_memory_updated_at": -1}},
]
# 获取总数
count_pipeline = pipeline + [{"$count": "total"}]
count_result = list(collection.aggregate(count_pipeline))
total_count = count_result[0]["total"] if count_result else 0
# 应用分页
if limit is not None:
if page is not None:
pipeline.append({"$skip": (page - 1) * limit})
pipeline.append({"$limit": limit})
results = list(collection.aggregate(pipeline))
return results, total_count
except Exception as e:
log_error(f"获取用户内存统计时出错: {e}")
raise e
def upsert_memories(
self, memories: List[UserMemory], deserialize: Optional[bool] = True, preserve_updated_at: bool = False
) -> List[Union[UserMemory, Dict[str, Any]]]:
"""
批量插入多个用户内存以提高大数据集的性能
Args:
memories: 要插入的记忆列表
deserialize: 是否反序列化记忆
preserve_updated_at: 是否保留现有updated_at时间戳
Returns:
插入的记忆列表
"""
if not memories:
return []
try:
collection = self._get_collection(table_type="memories", create_collection_if_not_found=True)
if collection is None:
log_info("内存集合不可用,回退到单独插入")
return [
result
for memory in memories
if memory is not None
for result in [self.upsert_user_memory(memory, deserialize=deserialize)]
if result is not None
]
from pymongo import ReplaceOne
operations = []
results: List[Union[UserMemory, Dict[str, Any]]] = []
current_time = int(time.time())
for memory in memories:
if memory is None:
continue
if memory.memory_id is None:
memory.memory_id = str(uuid4())
# 使用保留的updated_at(如果设置了标志且值存在),否则使用当前时间
updated_at = memory.updated_at if preserve_updated_at else current_time
record = {
"user_id": memory.user_id,
"agent_id": memory.agent_id,
"team_id": memory.team_id,
"memory_id": memory.memory_id,
"memory": memory.memory,
"input": memory.input,
"feedback": memory.feedback,
"topics": memory.topics,
"created_at": memory.created_at,
"updated_at": updated_at,
}
operations.append(ReplaceOne(filter={"memory_id": memory.memory_id}, replacement=record, upsert=True))
if operations:
# 执行批量写入
collection.bulk_write(operations)
# 获取结果
memory_ids = [memory.memory_id for memory in memories if memory and memory.memory_id]
cursor = collection.find({"memory_id": {"$in": memory_ids}})
for doc in cursor:
if deserialize:
# 在创建UserMemory对象之前移除MongoDB的_id字段
doc_filtered = {k: v for k, v in doc.items() if k != "_id"}
results.append(UserMemory.from_dict(doc_filtered))
else:
results.append(doc)
return results
except Exception as e:
log_error(f"批量内存插入期间出错,回退到单独插入: {e}")
# 回退到单独插入
return [
result
for memory in memories
if memory is not None
for result in [self.upsert_user_memory(memory, deserialize=deserialize)]
if result is not None
]
def calculate_date_metrics(date_to_process: date, sessions_data: dict) -> dict:
"""计算给定单日指标的完整统计"""
metrics = {
"users_count": 0,
"agent_sessions_count": 0,
"team_sessions_count": 0,
"workflow_sessions_count": 0,
"agent_runs_count": 0,
"team_runs_count": 0,
"workflow_runs_count": 0,
}
token_metrics = {
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"audio_total_tokens": 0,
"audio_input_tokens": 0,
"audio_output_tokens": 0,
"cache_read_tokens": 0,
"cache_write_tokens": 0,
"reasoning_tokens": 0,
}
model_counts: Dict[str, int] = {}
session_types = [
("agent", "agent_sessions_count", "agent_runs_count"),
("team", "team_sessions_count", "team_runs_count"),
("workflow", "workflow_sessions_count", "workflow_runs_count"),
]
all_user_ids = set()
for session_type, sessions_count_key, runs_count_key in session_types:
sessions = sessions_data.get(session_type, []) or []
metrics[sessions_count_key] = len(sessions)
for session in sessions:
if session.get("user_id"):
all_user_ids.add(session["user_id"])
runs = session.get("runs", [])
metrics[runs_count_key] += len(runs)
if runs := session.get("runs", []):
if isinstance(runs, str):
runs = json.loads(runs)
for run in runs:
if model_id := run.get("model"):
model_provider = run.get("model_provider", "")
model_counts[f"{model_id}:{model_provider}"] = (
model_counts.get(f"{model_id}:{model_provider}", 0) + 1
)
session_data = session.get("session_data", {})
if isinstance(session_data, str):
session_data = json.loads(session_data)
session_metrics = session_data.get("session_metrics", {})
for field in token_metrics:
token_metrics[field] += session_metrics.get(field, 0)
model_metrics = []
for model, count in model_counts.items():
model_id, model_provider = model.rsplit(":", 1)
model_metrics.append({"model_id": model_id, "model_provider": model_provider, "count": count})
metrics["users_count"] = len(all_user_ids)
current_time = int(time.time())
return {
"id": str(uuid4()),
"date": date_to_process,
"completed": date_to_process < datetime.now(timezone.utc).date(),
"token_metrics": token_metrics,
"model_metrics": model_metrics,
"created_at": current_time,
"updated_at": current_time,
"aggregation_period": "daily",
**metrics,
}
"""Agent with MongoDB storage example"""
from agno.agent import Agent
from agno.db.mongo import MongoDb
from agno.tools.duckduckgo import DuckDuckGoTools
# MongoDB连接设置
db_url = "mongodb://mongoadmin:secret@localhost:27017"
# 创建数据库连接
db = MongoDb(db_url=db_url)
# 创建带数据库的Agent
agent = Agent(
db=db,
tools=[DuckDuckGoTools()],
add_history_to_context=True,
)
# 使用Agent
agent.print_response("How many people live in Canada?")
agent.print_response("What is their national anthem called?")
"""AsyncAgent with AsyncMongoDB storage example"""
import asyncio
from agno.agent import Agent
from agno.db.mongo import AsyncMongoDb
from agno.tools.duckduckgo import DuckDuckGoTools
# MongoDB连接设置
db_url = "mongodb://mongoadmin:secret@localhost:27017"
# 创建异步数据库连接
db = AsyncMongoDb(db_url=db_url)
# 创建异步Agent
agent = Agent(
db=db,
tools=[DuckDuckGoTools()],
add_history_to_context=True,
)
# 异步使用Agent
async def main():
await agent.aprint_response("How many people live in Canada?")
await agent.aprint_response("What is their national anthem called?")
asyncio.run(main())
"""Team with MongoDB storage for collaborative AI workflows"""
from typing import List
from agno.agent import Agent
from agno.db.mongo import MongoDb
from agno.models.openai import OpenAIChat
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from pydantic import BaseModel
# 数据模型定义
class Article(BaseModel):
title: str
summary: str
reference_links: List[str]
# MongoDB连接
db_url = "mongodb://mongoadmin:secret@localhost:27017"
db = MongoDb(db_url=db_url)
# 创建专业Agent
hn_researcher = Agent(
name="HackerNews Researcher",
model=OpenAIChat("gpt-4o"),
role="Gets top stories from hackernews.",
tools=[HackerNewsTools()],
)
web_searcher = Agent(
name="Web Searcher",
model=OpenAIChat("gpt-4o"),
role="Searches the web for information on a topic",
tools=[DuckDuckGoTools()],
add_datetime_to_context=True,
)
# 创建协作团队
hn_team = Team(
name="HackerNews Team",
model=OpenAIChat("gpt-4o"),
members=[hn_researcher, web_searcher],
db=db, # 团队级数据库支持
instructions=[
"First, search hackernews for what the user is asking about.",
"Then, ask the web searcher to search for each story to get more information.",
"Finally, provide a thoughtful and engaging summary.",
],
output_schema=Article,
markdown=True,
show_members_responses=True,
add_member_tools_to_context=False,
)
# 执行团队任务
hn_team.print_response("Write an article about the top 2 stories on hackernews")
"""Workflow with MongoDB for long-running processes"""
import asyncio
from agno.agent import Agent
from agno.db.mongo import AsyncMongoDb
from agno.models.openai import OpenAIChat
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
# MongoDB连接
db_url = "mongodb://mongoadmin:secret@localhost:27017"
db = AsyncMongoDb(db_url=db_url)
# 创建工作流Agent
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[HackerNewsTools()],
role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
role="Search the web for the latest news and trends",
)
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
)
# 创建研究团队
research_team = Team(
name="Research Team",
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
# 定义工作流步骤
research_step = Step(
name="Research Step",
team=research_team,
)
content_planning_step = Step(
name="Content Planning Step",
agent=content_planner,
)
# 创建完整工作流
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from blog posts to social media",
db=db, # 工作流级数据库支持
steps=[research_step, content_planning_step],
)
# 执行异步工作流
async def main():
await content_creation_workflow.aprint_response(
input="AI trends in 2024",
markdown=True,
)
asyncio.run(main())
Agno提供了MongoDB的Docker部署脚本:
# 启动MongoDB容器
docker run -d \
--name local-mongo \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
-e MONGO_INITDB_ROOT_PASSWORD=secret \
mongo
# 使用项目提供的脚本
./scripts/run_mongodb.sh
# 同步版本
pip install pymongo
# 异步版本
pip install pymongo motor
Agno的MongoDB实现包含了完善的错误处理机制:
def upsert_session(
self, session: Session, deserialize: Optional[bool] = True
) -> Optional[Union[Session, Dict[str, Any]]]:
"""会话插入/更新,带重试机制"""
max_retries = 3
backoff_factor = 1.5
for attempt in range(max_retries):
try:
collection = self._get_collection(table_type="sessions", create_collection_if_not_found=True)
if collection is None:
return None
# ... 插入逻辑 ...
except OperationFailure as e:
if attempt == max_retries - 1:
log_error(f"插入会话失败,已达到最大重试次数: {e}")
raise e
else:
wait_time = backoff_factor ** attempt
log_warning(f"插入会话失败,{wait_time}秒后重试: {e}")
time.sleep(wait_time)
except Exception as e:
log_error(f"插入会话时发生未预期错误: {e}")
raise e
except Exception as e:
log_error(f"批量内存插入期间出错,回退到单独插入: {e}")
# 回退到单独插入
return [
result
for memory in memories
if memory is not None
for result in [self.upsert_user_memory(memory, deserialize=deserialize)]
if result is not None
]
# 支持SSL连接和身份验证
db_url = "mongodb://username:password@localhost:27017/?ssl=true&authSource=admin"
db = MongoDb(db_url=db_url)
# 性能指标收集
metrics = {
"operation_type": "insert",
"collection": "sessions",
"duration_ms": duration,
"timestamp": time.time(),
"success": success,
"error": error_message if not success else None
}
MongoDB在Agno中的应用体现了以下核心优势:
还没有人回复