AgentOS 是 Agno 框架的核心操作系统,为构建、部署和管理智能代理应用提供了完整的基础设施。它不仅仅是一个简单的代理框架,而是一个企业级的多代理操作系统,支持复杂的团队协作、工作流编排、记忆管理和多种接口集成。
🤖 多代理协作:支持创建复杂的代理团队,实现智能任务分配和协作
🧠 智能记忆系统:提供会话记忆、用户记忆和上下文历史管理
⚡ 工作流引擎:支持条件路由、并行处理和步骤编排
🔧 MCP工具集成:通过 Model Context Protocol 集成外部工具和服务
🌐 多接口支持:支持 AGUI、Slack、WhatsApp、A2A 等多种接口
📊 可观测性:内置监控、日志和性能分析功能
🔒 企业级安全:支持生产环境部署和安全配置
让我们深入分析 cookbook/agent_os/basic.py 的核心实现:
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
# 数据库配置 - 使用 PostgreSQL 作为持久化存储
db = PostgresDb(
id="basic-agent-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai"
)
# 基础代理配置 - 启用完整的记忆和会话功能
basic_agent = Agent(
id="basic-agent",
name="Basic Agent",
model=OpenAIChat(id="gpt-4o"),
db=db,
enable_session_summaries=True, # 启用会话摘要
enable_user_memories=True, # 启用用户记忆
add_history_to_context=True, # 添加历史到上下文
num_history_runs=3, # 保留3次运行历史
read_chat_history=True, # 读取聊天历史
markdown=True, # 支持Markdown格式
debug_mode=True # 启用调试模式
)
# 团队配置 - 创建包含基础代理的团队
basic_team = Team(
id="basic-team",
name="Basic Team",
members=[basic_agent],
db=db,
enable_session_summaries=True,
enable_user_memories=True,
add_history_to_context=True,
num_history_runs=3,
read_chat_history=True,
markdown=True
)
# 工作流配置 - 定义简单的工作流步骤
basic_workflow = Workflow(
id="basic-workflow",
name="Basic Workflow",
steps=[
Step(
name="basic_step",
description="Basic workflow step",
agent=basic_agent
)
],
db=db
)
# AgentOS 应用组装
agent_os = AgentOS(
description="Basic AgentOS Example",
agents=[basic_agent],
teams=[basic_team],
workflows=[basic_workflow],
db=db
)
# 启动服务
if __name__ == "__main__":
agent_os.serve(host="0.0.0.0", port=7777, reload=True)
核心模块解析:
agno.agent.Agent: 代理基类,提供完整的代理功能agno.db.postgres.PostgresDb: PostgreSQL 数据库适配器agno.models.openai.OpenAIChat: OpenAI 模型集成agno.os.AgentOS: AgentOS 核心操作系统agno.workflow.step.Step: 工作流步骤定义agno.workflow.workflow.Workflow: 工作流引擎PostgreSQL 连接配置:
db = PostgresDb(
id="basic-agent-db", # 数据库实例唯一标识
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai"
)
连接字符串解析:
postgresql+psycopg: 使用 psycopg 驱动连接 PostgreSQLai:ai: 用户名和密码localhost:5532: 数据库服务器地址和端口ai: 数据库名称sessions: 会话管理表user_memories: 用户记忆存储表session_memories: 会话记忆存储表workflows: 工作流状态表agents: 代理配置表teams: 团队配置表AgentOS 的记忆系统是其最强大的特性之一,包含多个层次的记忆管理:
# 会话记忆配置
basic_agent = Agent(
enable_session_summaries=True, # 启用会话摘要生成
add_history_to_context=True, # 将历史添加到上下文
num_history_runs=3, # 保留最近3次运行历史
read_chat_history=True # 从数据库读取聊天历史
)
会话摘要生成机制:
# 用户记忆配置
basic_agent = Agent(
enable_user_memories=True, # 启用用户记忆功能
db=db # 指定存储数据库
)
用户记忆类型:
# 上下文历史配置
basic_agent = Agent(
add_history_to_context=True, # 添加历史到上下文
num_history_runs=3, # 保留3次运行历史
read_chat_history=True # 读取聊天历史
)
上下文构建策略:
basic_team = Team(
id="basic-team",
name="Basic Team",
members=[basic_agent], # 团队成员列表
db=db, # 共享数据库
enable_session_summaries=True, # 团队会话摘要
enable_user_memories=True, # 团队用户记忆
add_history_to_context=True, # 团队上下文历史
num_history_runs=3,
read_chat_history=True,
markdown=True
)
任务分配策略:
basic_workflow = Workflow(
id="basic-workflow",
name="Basic Workflow",
steps=[
Step(
name="basic_step",
description="Basic workflow step",
agent=basic_agent
)
],
db=db # 工作流状态持久化
)
条件路由 (Conditional Routing):
from agno.workflow.step import Router
workflow = Workflow(
name="Intelligent Research Workflow",
steps=[
Router(
name="research_strategy_router",
selector=research_router_function,
choices=[research_hackernews, research_web],
description="Intelligently selects research method based on topic"
),
publish_content
]
)
并行处理 (Parallel Execution):
from agno.workflow.step import Parallel
workflow = Workflow(
name="Parallel Analysis Workflow",
steps=[
Parallel(
name="parallel_analysis",
steps=[
technical_analysis_step,
market_analysis_step,
competitive_analysis_step
]
),
synthesis_step
]
)
错误处理与重试 (Error Handling & Retries):
workflow = Workflow(
name="Reliable Processing Workflow",
steps=[
Step(
name="reliable_step",
agent=reliable_agent,
retry_count=3, # 重试次数
retry_delay=5, # 重试延迟(秒)
timeout=30 # 超时时间(秒)
)
]
)
MCP (Model Context Protocol) 是 AgentOS 的核心功能,允许代理无缝集成外部工具和服务:
from agno.tools.mcp import MCPTools
# 创建 MCP 工具包
mcp_tools = MCPTools(
name="airbnb_search",
command="npx",
args=["-y", "@mcp/tools-airbnb"],
transport="stdio", # 传输协议
timeout=30 # 超时配置
)
# 集成到代理
agent = Agent(
name="Travel Agent",
tools=[mcp_tools],
mcp_enabled=True # 启用 MCP 支持
)
搜索工具:
# 创建自定义 MCP 工具
from agno.tools.mcp import MCPTools
custom_tools = MCPTools(
name="my_custom_tools",
command="python",
args=["custom_tools_server.py"],
transport="stdio",
env={"CUSTOM_API_KEY": "your_api_key"}
)
AGUI (Agent Graphical User Interface) 是 AgentOS 的 Web 界面:
# AGUI 配置
agent_os = AgentOS(
description="AGUI Enabled AgentOS",
agents=[agent],
teams=[team],
workflows=[workflow],
agui_enabled=True, # 启用 AGUI
agui_config={
"theme": "dark", # 界面主题
"show_metrics": True, # 显示指标
"enable_chat": True, # 启用聊天功能
"show_workflows": True # 显示工作流状态
}
)
# Slack 集成配置
from agno.integrations.slack import SlackIntegration
slack_integration = SlackIntegration(
bot_token="xoxb-your-bot-token",
app_token="xapp-your-app-token",
signing_secret="your-signing-secret"
)
agent_os = AgentOS(
description="Slack Integrated AgentOS",
agents=[agent],
integrations=[slack_integration]
)
# WhatsApp 集成
from agno.integrations.whatsapp import WhatsAppIntegration
whatsapp_integration = WhatsAppIntegration(
phone_number_id="your-phone-number-id",
access_token="your-access-token",
webhook_secret="your-webhook-secret"
)
agent_os = AgentOS(
description="WhatsApp Bot AgentOS",
agents=[agent],
integrations=[whatsapp_integration]
)
# A2A 协议配置
from agno.integrations.a2a import A2AConfig
a2a_config = A2AConfig(
agent_id="my-agent",
endpoint="https://api.example.com/a2a",
capabilities=["text_generation", "data_analysis", "web_search"]
)
agent_os = AgentOS(
description="A2A Protocol Enabled AgentOS",
agents=[agent],
a2a_config=a2a_config
)
# 创建虚拟环境
python -m venv agentos_env
source agentos_env/bin/activate # Linux/Mac
# 或
agentos_env\Scripts\activate # Windows
# 安装核心依赖
pip install agno psycopg-binary openai
# 安装可选依赖
pip install agno[all] # 安装所有可选依赖
# 使用提供的脚本启动 PostgreSQL
cd cookbook/scripts
./run_pgvector.sh # Linux/Mac
# 或
run_pgvector.bat # Windows
Docker 容器配置:
# 自定义 PostgreSQL 容器
FROM agnohq/pgvector:16
# 环境变量配置
ENV POSTGRES_DB=ai
ENV POSTGRES_USER=ai
ENV POSTGRES_PASSWORD=ai
ENV PGDATA=/var/lib/postgresql/data/pgdata
# 端口映射
EXPOSE 5532
# 数据持久化
VOLUME ["/var/lib/postgresql/data"]
# 高级数据库配置
db = PostgresDb(
id="production-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_size=20, # 连接池大小
max_overflow=30, # 最大溢出连接
pool_timeout=30, # 连接超时时间
pool_recycle=3600, # 连接回收时间
echo=False # SQL 日志输出
)
Dockerfile 配置:
# 多阶段构建优化
FROM python:3.11-slim as builder
# 安装构建依赖
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 创建应用目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 生产阶段
FROM python:3.11-slim
# 安装运行时依赖
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 复制应用代码
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY . /app
WORKDIR /app
# 非 root 用户运行
RUN useradd -m -u 1000 agentos
USER agentos
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:7777/health || exit 1
# 启动命令
CMD ["python", "app.py"]
Docker Compose 配置:
version: '3.8'
services:
postgres:
image: agnohq/pgvector:16
environment:
POSTGRES_DB: ai
POSTGRES_USER: ai
POSTGRES_PASSWORD: ai
PGDATA: /var/lib/postgresql/data/pgdata
volumes:
- pg_data:/var/lib/postgresql/data
ports:
- "5532:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ai -d ai"]
interval: 10s
timeout: 5s
retries: 5
agentos:
build: .
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_URL: postgresql+psycopg://ai:ai@postgres:5432/ai
OPENAI_API_KEY: ${OPENAI_API_KEY}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
AGENTOS_ENV: production
AGENTOS_LOG_LEVEL: INFO
ports:
- "7777:7777"
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:7777/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
volumes:
pg_data:
redis_data:
networks:
default:
driver: bridge
Deployment 配置:
apiVersion: apps/v1
kind: Deployment
metadata:
name: agentos-deployment
labels:
app: agentos
spec:
replicas: 3
selector:
matchLabels:
app: agentos
template:
metadata:
labels:
app: agentos
spec:
containers:
- name: agentos
image: your-registry/agentos:latest
ports:
- containerPort: 7777
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agentos-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agentos-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 7777
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 7777
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: agentos-service
spec:
selector:
app: agentos
ports:
- protocol: TCP
port: 80
targetPort: 7777
type: LoadBalancer
AWS ECS 配置:
{
"family": "agentos-task",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "512",
"memory": "1024",
"executionRoleArn": "arn:aws:iam::account:role/ecsTaskExecutionRole",
"containerDefinitions": [
{
"name": "agentos",
"image": "your-registry/agentos:latest",
"portMappings": [
{
"containerPort": 7777,
"protocol": "tcp"
}
],
"environment": [
{
"name": "DATABASE_URL",
"value": "postgresql://user:pass@rds-endpoint:5432/agentos"
}
],
"secrets": [
{
"name": "OPENAI_API_KEY",
"valueFrom": "arn:aws:secretsmanager:region:account:secret:openai-api-key"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/agentos",
"awslogs-region": "us-west-2",
"awslogs-stream-prefix": "ecs"
}
}
}
]
}
# 数据库性能调优
db = PostgresDb(
id="optimized-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_pre_ping=True, # 连接健康检查
pool_size=50, # 大连接池
max_overflow=100, # 最大溢出
isolation_level="READ_COMMITTED" # 事务隔离级别
)
# 代理性能优化配置
optimized_agent = Agent(
name="Optimized Agent",
model=OpenAIChat(id="gpt-4o-mini"), # 使用轻量级模型
temperature=0.3, # 降低温度提高一致性
max_tokens=1000, # 限制输出长度
timeout=15, # 设置超时时间
cache_model_responses=True, # 启用模型响应缓存
stream=False, # 禁用流式输出以提高性能
debug_mode=False # 禁用调试模式
)
# 工作流性能配置
optimized_workflow = Workflow(
name="High Performance Workflow",
store_events=False, # 禁用事件存储
stream=False, # 禁用流式输出
cache_session=False, # 禁用会话缓存
telemetry=False, # 禁用遥测
parallel=True, # 启用并行执行
max_workers=10 # 最大工作线程数
)
AgentOS 支持多种可观测性平台集成:
import agentops
from agno.agent import Agent
# 初始化 AgentOps
agentops.init(
api_key="your-agentops-api-key",
tags=["production", "agentos"]
)
# 创建被监控的代理
monitored_agent = Agent(
name="Monitored Agent",
model=OpenAIChat(id="gpt-4o"),
monitoring=True # 启用监控
)
from langfuse import Langfuse
from agno.integrations.langfuse import LangfuseIntegration
# Langfuse 配置
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com"
)
# 集成到 AgentOS
langfuse_integration = LangfuseIntegration(
langfuse=langfuse,
trace_name="agentos-trace",
tags=["production"]
)
agent_os = AgentOS(
description="Langfuse Monitored AgentOS",
agents=[agent],
integrations=[langfuse_integration]
)
from agno.metrics import MetricsCollector
# 自定义指标收集器
metrics = MetricsCollector(
name="custom_metrics",
endpoint="http://metrics-server:9090/metrics"
)
# 代理性能指标
@metrics.counter
async def agent_response_time(agent_name: str, duration: float):
return {"agent": agent_name, "duration": duration}
@metrics.gauge
def active_sessions():
return get_active_session_count()
# 集成到代理
monitored_agent = Agent(
name="Metrics Enabled Agent",
metrics=metrics
)
import logging
from pythonjsonlogger import jsonlogger
# JSON 日志格式
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
# 根日志器配置
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logHandler)
# AgentOS 特定日志
agentos_logger = logging.getLogger("agno")
agentos_logger.setLevel(logging.DEBUG)
# 文件日志
file_handler = logging.FileHandler("agentos.log")
file_handler.setFormatter(formatter)
agentos_logger.addHandler(file_handler)
# 生产环境日志采样
class SamplingFilter(logging.Filter):
def __init__(self, sample_rate=0.1):
self.sample_rate = sample_rate
self.counter = 0
def filter(self, record):
self.counter += 1
return self.counter % int(1/self.sample_rate) == 0
# 应用采样过滤器
sampling_filter = SamplingFilter(sample_rate=0.01) # 1% 采样率
agentos_logger.addFilter(sampling_filter)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class HealthStatus(BaseModel):
status: str
timestamp: str
checks: dict
@app.get("/health", response_model=HealthStatus)
async def health_check():
"""综合健康检查"""
checks = {
"database": await check_database_health(),
"redis": await check_redis_health(),
"model_api": await check_model_api_health(),
"memory_usage": check_memory_usage(),
"disk_space": check_disk_space()
}
overall_status = "healthy" if all(checks.values()) else "unhealthy"
return HealthStatus(
status=overall_status,
timestamp=datetime.utcnow().isoformat(),
checks=checks
)
@app.get("/ready")
async def readiness_check():
"""就绪检查 - 用于 Kubernetes"""
if not await is_system_ready():
raise HTTPException(status_code=503, detail="System not ready")
return {"status": "ready"}
@app.get("/live")
async def liveness_check():
"""存活检查 - 用于 Kubernetes"""
if not await is_system_alive():
raise HTTPException(status_code=503, detail="System not alive")
return {"status": "alive"}
from agno.alerts import AlertManager, AlertSeverity
# 告警管理器
alert_manager = AlertManager(
webhook_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
email_config={
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "alerts@yourcompany.com",
"password": "your-app-password"
}
)
# 定义告警规则
@alert_manager.alert(
name="high_error_rate",
condition=lambda: get_error_rate() > 0.05, # 5% 错误率
severity=AlertSeverity.CRITICAL,
message="错误率超过 5% 阈值"
)
@alert_manager.alert(
name="high_response_time",
condition=lambda: get_avg_response_time() > 5000, # 5秒
severity=AlertSeverity.WARNING,
message="平均响应时间超过 5 秒"
)
@alert_manager.alert(
name="database_connection_loss",
condition=lambda: not is_database_connected(),
severity=AlertSeverity.CRITICAL,
message="数据库连接丢失"
)
# 启动告警监控
alert_manager.start_monitoring(interval=60) # 每分钟检查一次
问题: psycopg2.OperationalError: could not connect to server
原因分析:
# 检查 PostgreSQL 状态
docker ps | grep postgres
# 查看容器日志
docker logs pgvector
# 测试连接
docker exec -it pgvector psql -U ai -d ai -c "SELECT version();"
# 网络连通性测试
telnet localhost 5532
预防措施:
# 连接重试机制
db = PostgresDb(
id="resilient-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_pre_ping=True, # 连接前检查
pool_recycle=3600, # 定期回收连接
connect_args={
"connect_timeout": 10, # 连接超时
"application_name": "agentos" # 应用标识
}
)
问题: openai.RateLimitError: Rate limit reached
原因分析:
# 实现指数退避重试
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def call_openai_api(messages):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1000
)
return response
except openai.RateLimitError as e:
logger.warning(f"Rate limit hit, retrying: {e}")
raise
# 请求限流
from asyncio import Semaphore
rate_limit_semaphore = Semaphore(10) # 最多10个并发请求
async def limited_api_call(messages):
async with rate_limit_semaphore:
return await call_openai_api(messages)
问题: 内存使用量持续增长
诊断方法:
import tracemalloc
import gc
# 启用内存跟踪
tracemalloc.start()
# 定期快照
def memory_snapshot():
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 memory consumers ]")
for stat in top_stats[:10]:
print(stat)
return snapshot
# 内存泄漏检测
class MemoryMonitor:
def __init__(self):
self.baseline = None
self.snapshots = []
def take_baseline(self):
gc.collect()
self.baseline = tracemalloc.take_snapshot()
def check_leaks(self):
if not self.baseline:
return
gc.collect()
current = tracemalloc.take_snapshot()
# 比较内存使用
stats = current.compare_to(self.baseline, 'lineno')
print("\n[ Memory Leak Detection ]")
for stat in stats[:10]:
if stat.size_diff > 1024 * 1024: # 1MB 增长
print(f"POTENTIAL LEAK: {stat}")
问题: 会话历史无法正确加载
诊断步骤:
# 会话调试工具
class SessionDebugger:
def __init__(self, db):
self.db = db
def check_session_integrity(self, session_id):
"""检查会话完整性"""
# 检查会话表
session = self.db.get_session(session_id)
if not session:
print(f"❌ Session {session_id} not found")
return False
# 检查相关记忆
memories = self.db.get_session_memories(session_id)
print(f"📋 Found {len(memories)} memories for session {session_id}")
# 检查用户记忆
user_memories = self.db.get_user_memories(session.user_id)
print(f"👤 Found {len(user_memories)} user memories")
# 检查会话摘要
summaries = self.db.get_session_summaries(session_id)
print(f"📝 Found {len(summaries)} session summaries")
return True
def repair_session(self, session_id):
"""修复损坏的会话"""
try:
# 重新创建会话索引
self.db.rebuild_session_index(session_id)
# 重建记忆关联
self.db.rebuild_memory_links(session_id)
print(f"✅ Session {session_id} repaired successfully")
return True
except Exception as e:
print(f"❌ Failed to repair session {session_id}: {e}")
return False
import cProfile
import pstats
from io import StringIO
class PerformanceProfiler:
def __init__(self):
self.profiler = cProfile.Profile()
def start_profiling(self):
"""开始性能分析"""
self.profiler.enable()
print("🔍 Performance profiling started")
def stop_profiling(self):
"""停止性能分析并生成报告"""
self.profiler.disable()
# 创建统计报告
stream = StringIO()
stats = pstats.Stats(self.profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # 显示前20个函数
print("\n[ Performance Profile Report ]")
print(stream.getvalue())
# 保存到文件
stats.dump_stats('agentos_profile.prof')
print("📊 Profile saved to agentos_profile.prof")
def profile_agent_operation(self, agent, operation, *args, **kwargs):
"""分析特定代理操作"""
self.start_profiling()
try:
result = operation(*args, **kwargs)
return result
finally:
self.stop_profiling()
# 查询性能分析
class QueryOptimizer:
def __init__(self, db):
self.db = db
def analyze_slow_queries(self, threshold_ms=1000):
"""分析慢查询"""
slow_queries = self.db.execute("""
SELECT
query,
execution_time,
execution_count,
avg_time
FROM pg_stat_statements
WHERE mean_exec_time > %s
ORDER BY mean_exec_time DESC
LIMIT 10
""", [threshold_ms])
print(f"\n[ Slow Queries (> {threshold_ms}ms) ]")
for query in slow_queries:
print(f"⏱️ {query['avg_time']:.2f}ms avg, "
f"{query['execution_count']} calls")
print(f"📋 {query['query'][:200]}...")
return slow_queries
def optimize_memory_queries(self):
"""优化记忆查询"""
# 创建复合索引
self.db.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_session_memories_lookup
ON session_memories (session_id, created_at DESC)
""")
# 创建用户记忆索引
self.db.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_user_memories_lookup
ON user_memories (user_id, created_at DESC)
""")
print("✅ Memory query indexes created")
# 安全的密钥管理
from cryptography.fernet import Fernet
import os
class SecureKeyManager:
def __init__(self):
# 从环境变量获取主密钥
self.master_key = os.environ.get('MASTER_KEY')
if not self.master_key:
# 生成新密钥(仅首次运行)
self.master_key = Fernet.generate_key()
print(f"🔑 Generated master key: {self.master_key.decode()}")
print("⚠️ SAVE THIS KEY SECURELY!")
self.cipher = Fernet(self.master_key)
def encrypt_api_key(self, api_key: str) -> str:
"""加密 API 密钥"""
encrypted = self.cipher.encrypt(api_key.encode())
return encrypted.decode()
def decrypt_api_key(self, encrypted_key: str) -> str:
"""解密 API 密钥"""
decrypted = self.cipher.decrypt(encrypted_key.encode())
return decrypted.decode()
def store_api_key_securely(self, service: str, api_key: str):
"""安全存储 API 密钥"""
encrypted_key = self.encrypt_api_key(api_key)
# 存储到环境变量或安全存储
os.environ[f'{service.upper()}_API_KEY_ENCRYPTED'] = encrypted_key
# 或者存储到加密的配置文件
with open(f'{service}_key.enc', 'w') as f:
f.write(encrypted_key)
def get_api_key(self, service: str) -> str:
"""获取解密的 API 密钥"""
# 从环境变量获取
encrypted_key = os.environ.get(f'{service.upper()}_API_KEY_ENCRYPTED')
if not encrypted_key:
# 从文件读取
try:
with open(f'{service}_key.enc', 'r') as f:
encrypted_key = f.read()
except FileNotFoundError:
raise ValueError(f"No encrypted key found for {service}")
return self.decrypt_api_key(encrypted_key)
import re
from html import escape
class SecurityValidator:
def __init__(self):
# 定义危险字符模式
self.dangerous_patterns = [
r'<script.*?>.*?</script>', # XSS 脚本
r'javascript:', # JavaScript 协议
r'on\w+\s*=\s*["\'].*?["\']', # 事件处理器
r'\.\.\/|\.\\', # 路径遍历
r'union\s+select|drop\s+table', # SQL 注入
r'\$\{.*?\}|\{\{.*?\}\}', # 模板注入
]
def sanitize_input(self, user_input: str) -> str:
"""清理用户输入"""
if not isinstance(user_input, str):
return str(user_input)
# HTML 转义
sanitized = escape(user_input)
# 移除危险模式
for pattern in self.dangerous_patterns:
sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)
# 长度限制
max_length = 10000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
return sanitized.strip()
def validate_prompt(self, prompt: str) -> bool:
"""验证提示词安全性"""
# 检查是否包含恶意内容
malicious_keywords = [
'ignore previous instructions',
'disregard safety guidelines',
'act as a different AI',
'bypass security',
'system prompt'
]
prompt_lower = prompt.lower()
for keyword in malicious_keywords:
if keyword in prompt_lower:
return False
return True
# 集成到代理
security_validator = SecurityValidator()
secure_agent = Agent(
name="Secure Agent",
input_validator=security_validator.sanitize_input,
prompt_validator=security_validator.validate_prompt
)
AgentOS 作为一个企业级的多代理操作系统,提供了完整的基础设施来构建、部署和管理智能代理应用。通过其强大的记忆系统、灵活的工作流引擎、丰富的工具集成和多接口支持,开发者可以快速构建复杂的 AI 应用系统。
随着 AI 技术的快速发展,AgentOS 将继续演进,支持更多先进的 AI 模型、更智能的协作机制和更强大的部署选项。未来版本将重点关注:
还没有人回复