## 🎯 概述
AgentOS 是 Agno 框架的核心操作系统,为构建、部署和管理智能代理应用提供了完整的基础设施。它不仅仅是一个简单的代理框架,而是一个企业级的多代理操作系统,支持复杂的团队协作、工作流编排、记忆管理和多种接口集成。
### 核心特性
**🤖 多代理协作**:支持创建复杂的代理团队,实现智能任务分配和协作
**🧠 智能记忆系统**:提供会话记忆、用户记忆和上下文历史管理
**⚡ 工作流引擎**:支持条件路由、并行处理和步骤编排
**🔧 MCP工具集成**:通过 Model Context Protocol 集成外部工具和服务
**🌐 多接口支持**:支持 AGUI、Slack、WhatsApp、A2A 等多种接口
**📊 可观测性**:内置监控、日志和性能分析功能
**🔒 企业级安全**:支持生产环境部署和安全配置
## 🏗️ 架构设计
### 核心组件关系图
```mermaid
graph LR
subgraph "AgentOS Core"
AO[AgentOS] --> AG[Agents]
AO --> TM[Teams]
AO --> WF[Workflows]
AO --> DB[Database Layer]
AO --> MC[MCP Tools]
AO --> RT[Routers]
end
subgraph "Memory Management"
AG --> SM[Session Memory]
AG --> UM[User Memory]
AG --> CH[Context History]
AG --> SS[Session Summaries]
end
subgraph "Team Collaboration"
TM --> MM[Member Management]
TM --> CD[Coordination]
TM --> RS[Result Synthesis]
end
subgraph "Workflow Engine"
WF --> SE[Step Execution]
WF --> RT[Router Logic]
WF --> PE[Parallel Execution]
WF --> EH[Error Handling]
end
subgraph "External Interfaces"
AO --> GUI[AGUI Interface]
AO --> SLK[Slack Integration]
AO --> WSP[WhatsApp Bot]
AO --> A2A[Agent2Agent Protocol]
end
```
### 数据流流程图
```mermaid
sequenceDiagram
participant User
participant Interface
participant AgentOS
participant Agent
participant Team
participant Workflow
participant Database
User->>Interface: 发送请求
Interface->>AgentOS: 路由请求
AgentOS->>Database: 加载会话历史
AgentOS->>Agent: 创建/选择代理
Agent->>Database: 加载用户记忆
Agent->>Team: 加入团队(如需要)
Team->>Workflow: 启动工作流
Workflow->>Database: 存储执行状态
Workflow->>Agent: 分配任务
Agent->>Database: 更新记忆和状态
Agent->>Team: 返回结果
Team->>AgentOS: 合成最终结果
AgentOS->>Interface: 返回响应
Interface->>User: 显示结果
```
## 💻 代码解析
### 基础示例代码
让我们深入分析 `cookbook/agent_os/basic.py` 的核心实现:
```python
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
# 数据库配置 - 使用 PostgreSQL 作为持久化存储
db = PostgresDb(
id="basic-agent-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai"
)
# 基础代理配置 - 启用完整的记忆和会话功能
basic_agent = Agent(
id="basic-agent",
name="Basic Agent",
model=OpenAIChat(id="gpt-4o"),
db=db,
enable_session_summaries=True, # 启用会话摘要
enable_user_memories=True, # 启用用户记忆
add_history_to_context=True, # 添加历史到上下文
num_history_runs=3, # 保留3次运行历史
read_chat_history=True, # 读取聊天历史
markdown=True, # 支持Markdown格式
debug_mode=True # 启用调试模式
)
# 团队配置 - 创建包含基础代理的团队
basic_team = Team(
id="basic-team",
name="Basic Team",
members=[basic_agent],
db=db,
enable_session_summaries=True,
enable_user_memories=True,
add_history_to_context=True,
num_history_runs=3,
read_chat_history=True,
markdown=True
)
# 工作流配置 - 定义简单的工作流步骤
basic_workflow = Workflow(
id="basic-workflow",
name="Basic Workflow",
steps=[
Step(
name="basic_step",
description="Basic workflow step",
agent=basic_agent
)
],
db=db
)
# AgentOS 应用组装
agent_os = AgentOS(
description="Basic AgentOS Example",
agents=[basic_agent],
teams=[basic_team],
workflows=[basic_workflow],
db=db
)
# 启动服务
if __name__ == "__main__":
agent_os.serve(host="0.0.0.0", port=7777, reload=True)
```
### 依赖导入详解
**核心模块解析**:
- `agno.agent.Agent`: 代理基类,提供完整的代理功能
- `agno.db.postgres.PostgresDb`: PostgreSQL 数据库适配器
- `agno.models.openai.OpenAIChat`: OpenAI 模型集成
- `agno.os.AgentOS`: AgentOS 核心操作系统
- `agno.workflow.step.Step`: 工作流步骤定义
- `agno.workflow.workflow.Workflow`: 工作流引擎
### 数据库配置深度分析
**PostgreSQL 连接配置**:
```python
db = PostgresDb(
id="basic-agent-db", # 数据库实例唯一标识
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai"
)
```
**连接字符串解析**:
- `postgresql+psycopg`: 使用 psycopg 驱动连接 PostgreSQL
- `ai:ai`: 用户名和密码
- `localhost:5532`: 数据库服务器地址和端口
- `ai`: 数据库名称
**数据库表结构**:
AgentOS 自动创建以下核心表:
- `sessions`: 会话管理表
- `user_memories`: 用户记忆存储表
- `session_memories`: 会话记忆存储表
- `workflows`: 工作流状态表
- `agents`: 代理配置表
- `teams`: 团队配置表
## 🧠 核心功能详解
### 记忆管理系统
AgentOS 的记忆系统是其最强大的特性之一,包含多个层次的记忆管理:
#### 会话记忆 (Session Memory)
```python
# 会话记忆配置
basic_agent = Agent(
enable_session_summaries=True, # 启用会话摘要生成
add_history_to_context=True, # 将历史添加到上下文
num_history_runs=3, # 保留最近3次运行历史
read_chat_history=True # 从数据库读取聊天历史
)
```
**会话摘要生成机制**:
- 每次会话结束后自动生成摘要
- 使用 LLM 提取关键信息和重要决策点
- 存储在数据库中供后续会话使用
- 支持多语言摘要生成
#### 用户记忆 (User Memory)
```python
# 用户记忆配置
basic_agent = Agent(
enable_user_memories=True, # 启用用户记忆功能
db=db # 指定存储数据库
)
```
**用户记忆类型**:
1. **个人信息**: 用户偏好、背景信息
2. **交互历史**: 过往对话内容和决策
3. **学习记录**: 用户学习进度和掌握程度
4. **行为模式**: 用户交互习惯和偏好
#### 上下文历史管理
```python
# 上下文历史配置
basic_agent = Agent(
add_history_to_context=True, # 添加历史到上下文
num_history_runs=3, # 保留3次运行历史
read_chat_history=True # 读取聊天历史
)
```
**上下文构建策略**:
- 动态加载相关历史会话
- 基于语义相似度筛选相关内容
- 智能截断避免上下文过长
- 支持多轮对话连贯性
### 团队协作机制
#### 团队架构设计
```python
basic_team = Team(
id="basic-team",
name="Basic Team",
members=[basic_agent], # 团队成员列表
db=db, # 共享数据库
enable_session_summaries=True, # 团队会话摘要
enable_user_memories=True, # 团队用户记忆
add_history_to_context=True, # 团队上下文历史
num_history_runs=3,
read_chat_history=True,
markdown=True
)
```
#### 团队协调算法
**任务分配策略**:
1. **能力匹配**: 根据代理专长分配任务
2. **负载均衡**: 考虑代理当前工作负载
3. **历史表现**: 参考过往任务完成质量
4. **协作关系**: 考虑团队成员间协作历史
**结果合成机制**:
1. **多源信息整合**: 合并多个代理的输出
2. **冲突解决**: 处理不同代理间的观点冲突
3. **质量评估**: 评估合成结果的质量
4. **反馈优化**: 基于结果质量调整协作策略
### 工作流引擎
#### 工作流定义与执行
```python
basic_workflow = Workflow(
id="basic-workflow",
name="Basic Workflow",
steps=[
Step(
name="basic_step",
description="Basic workflow step",
agent=basic_agent
)
],
db=db # 工作流状态持久化
)
```
#### 高级工作流特性
**条件路由 (Conditional Routing)**:
```python
from agno.workflow.step import Router
workflow = Workflow(
name="Intelligent Research Workflow",
steps=[
Router(
name="research_strategy_router",
selector=research_router_function,
choices=[research_hackernews, research_web],
description="Intelligently selects research method based on topic"
),
publish_content
]
)
```
**并行处理 (Parallel Execution)**:
```python
from agno.workflow.step import Parallel
workflow = Workflow(
name="Parallel Analysis Workflow",
steps=[
Parallel(
name="parallel_analysis",
steps=[
technical_analysis_step,
market_analysis_step,
competitive_analysis_step
]
),
synthesis_step
]
)
```
**错误处理与重试 (Error Handling & Retries)**:
```python
workflow = Workflow(
name="Reliable Processing Workflow",
steps=[
Step(
name="reliable_step",
agent=reliable_agent,
retry_count=3, # 重试次数
retry_delay=5, # 重试延迟(秒)
timeout=30 # 超时时间(秒)
)
]
)
```
## 🔧 MCP 工具集成
### Model Context Protocol 详解
MCP (Model Context Protocol) 是 AgentOS 的核心功能,允许代理无缝集成外部工具和服务:
#### MCP 工具包配置
```python
from agno.tools.mcp import MCPTools
# 创建 MCP 工具包
mcp_tools = MCPTools(
name="airbnb_search",
command="npx",
args=["-y", "@mcp/tools-airbnb"],
transport="stdio", # 传输协议
timeout=30 # 超时配置
)
# 集成到代理
agent = Agent(
name="Travel Agent",
tools=[mcp_tools],
mcp_enabled=True # 启用 MCP 支持
)
```
#### 支持的 MCP 工具类型
**搜索工具**:
- DuckDuckGo 搜索
- SerpAPI Google 搜索
- Airbnb 房源搜索
- 地图和位置服务
**数据工具**:
- SQL 数据库查询
- CSV 文件处理
- JSON 数据操作
- Excel 文件读写
**API 工具**:
- GitHub API 集成
- Slack 消息发送
- Discord 通知
- 邮件发送服务
**自定义工具**:
```python
# 创建自定义 MCP 工具
from agno.tools.mcp import MCPTools
custom_tools = MCPTools(
name="my_custom_tools",
command="python",
args=["custom_tools_server.py"],
transport="stdio",
env={"CUSTOM_API_KEY": "your_api_key"}
)
```
## 🌐 多接口支持
### AGUI 接口
AGUI (Agent Graphical User Interface) 是 AgentOS 的 Web 界面:
```python
# AGUI 配置
agent_os = AgentOS(
description="AGUI Enabled AgentOS",
agents=[agent],
teams=[team],
workflows=[workflow],
agui_enabled=True, # 启用 AGUI
agui_config={
"theme": "dark", # 界面主题
"show_metrics": True, # 显示指标
"enable_chat": True, # 启用聊天功能
"show_workflows": True # 显示工作流状态
}
)
```
### Slack 集成
```python
# Slack 集成配置
from agno.integrations.slack import SlackIntegration
slack_integration = SlackIntegration(
bot_token="xoxb-your-bot-token",
app_token="xapp-your-app-token",
signing_secret="your-signing-secret"
)
agent_os = AgentOS(
description="Slack Integrated AgentOS",
agents=[agent],
integrations=[slack_integration]
)
```
### WhatsApp 机器人
```python
# WhatsApp 集成
from agno.integrations.whatsapp import WhatsAppIntegration
whatsapp_integration = WhatsAppIntegration(
phone_number_id="your-phone-number-id",
access_token="your-access-token",
webhook_secret="your-webhook-secret"
)
agent_os = AgentOS(
description="WhatsApp Bot AgentOS",
agents=[agent],
integrations=[whatsapp_integration]
)
```
### A2A (Agent-to-Agent) 协议
```python
# A2A 协议配置
from agno.integrations.a2a import A2AConfig
a2a_config = A2AConfig(
agent_id="my-agent",
endpoint="https://api.example.com/a2a",
capabilities=["text_generation", "data_analysis", "web_search"]
)
agent_os = AgentOS(
description="A2A Protocol Enabled AgentOS",
agents=[agent],
a2a_config=a2a_config
)
```
## 🚀 运行与部署
### 环境准备
#### 系统要求
- **Python**: 3.10 或更高版本
- **PostgreSQL**: 12.0 或更高版本
- **Docker**: 20.10 或更高版本(可选)
- **内存**: 最少 4GB RAM,推荐 8GB+
- **存储**: 最少 10GB 可用空间
#### 依赖安装
```bash
# 创建虚拟环境
python -m venv agentos_env
source agentos_env/bin/activate # Linux/Mac
# 或
agentos_env\Scripts\activate # Windows
# 安装核心依赖
pip install agno psycopg-binary openai
# 安装可选依赖
pip install agno[all] # 安装所有可选依赖
```
### 数据库设置
#### PostgreSQL Docker 部署
```bash
# 使用提供的脚本启动 PostgreSQL
cd cookbook/scripts
./run_pgvector.sh # Linux/Mac
# 或
run_pgvector.bat # Windows
```
**Docker 容器配置**:
```dockerfile
# 自定义 PostgreSQL 容器
FROM agnohq/pgvector:16
# 环境变量配置
ENV POSTGRES_DB=ai
ENV POSTGRES_USER=ai
ENV POSTGRES_PASSWORD=ai
ENV PGDATA=/var/lib/postgresql/data/pgdata
# 端口映射
EXPOSE 5532
# 数据持久化
VOLUME ["/var/lib/postgresql/data"]
```
#### 数据库连接池配置
```python
# 高级数据库配置
db = PostgresDb(
id="production-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_size=20, # 连接池大小
max_overflow=30, # 最大溢出连接
pool_timeout=30, # 连接超时时间
pool_recycle=3600, # 连接回收时间
echo=False # SQL 日志输出
)
```
### 生产环境部署
#### Docker 容器化部署
**Dockerfile 配置**:
```dockerfile
# 多阶段构建优化
FROM python:3.11-slim as builder
# 安装构建依赖
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 创建应用目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 生产阶段
FROM python:3.11-slim
# 安装运行时依赖
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 复制应用代码
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY . /app
WORKDIR /app
# 非 root 用户运行
RUN useradd -m -u 1000 agentos
USER agentos
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:7777/health || exit 1
# 启动命令
CMD ["python", "app.py"]
```
**Docker Compose 配置**:
```yaml
version: '3.8'
services:
postgres:
image: agnohq/pgvector:16
environment:
POSTGRES_DB: ai
POSTGRES_USER: ai
POSTGRES_PASSWORD: ai
PGDATA: /var/lib/postgresql/data/pgdata
volumes:
- pg_data:/var/lib/postgresql/data
ports:
- "5532:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ai -d ai"]
interval: 10s
timeout: 5s
retries: 5
agentos:
build: .
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_URL: postgresql+psycopg://ai:ai@postgres:5432/ai
OPENAI_API_KEY: ${OPENAI_API_KEY}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
AGENTOS_ENV: production
AGENTOS_LOG_LEVEL: INFO
ports:
- "7777:7777"
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:7777/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
volumes:
pg_data:
redis_data:
networks:
default:
driver: bridge
```
#### Kubernetes 部署
**Deployment 配置**:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agentos-deployment
labels:
app: agentos
spec:
replicas: 3
selector:
matchLabels:
app: agentos
template:
metadata:
labels:
app: agentos
spec:
containers:
- name: agentos
image: your-registry/agentos:latest
ports:
- containerPort: 7777
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agentos-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agentos-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 7777
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 7777
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: agentos-service
spec:
selector:
app: agentos
ports:
- protocol: TCP
port: 80
targetPort: 7777
type: LoadBalancer
```
#### 云服务部署
**AWS ECS 配置**:
```json
{
"family": "agentos-task",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "512",
"memory": "1024",
"executionRoleArn": "arn:aws:iam::account:role/ecsTaskExecutionRole",
"containerDefinitions": [
{
"name": "agentos",
"image": "your-registry/agentos:latest",
"portMappings": [
{
"containerPort": 7777,
"protocol": "tcp"
}
],
"environment": [
{
"name": "DATABASE_URL",
"value": "postgresql://user:pass@rds-endpoint:5432/agentos"
}
],
"secrets": [
{
"name": "OPENAI_API_KEY",
"valueFrom": "arn:aws:secretsmanager:region:account:secret:openai-api-key"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/agentos",
"awslogs-region": "us-west-2",
"awslogs-stream-prefix": "ecs"
}
}
}
]
}
```
### 性能优化
#### 数据库优化
```python
# 数据库性能调优
db = PostgresDb(
id="optimized-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_pre_ping=True, # 连接健康检查
pool_size=50, # 大连接池
max_overflow=100, # 最大溢出
isolation_level="READ_COMMITTED" # 事务隔离级别
)
```
#### 代理性能调优
```python
# 代理性能优化配置
optimized_agent = Agent(
name="Optimized Agent",
model=OpenAIChat(id="gpt-4o-mini"), # 使用轻量级模型
temperature=0.3, # 降低温度提高一致性
max_tokens=1000, # 限制输出长度
timeout=15, # 设置超时时间
cache_model_responses=True, # 启用模型响应缓存
stream=False, # 禁用流式输出以提高性能
debug_mode=False # 禁用调试模式
)
```
#### 工作流性能优化
```python
# 工作流性能配置
optimized_workflow = Workflow(
name="High Performance Workflow",
store_events=False, # 禁用事件存储
stream=False, # 禁用流式输出
cache_session=False, # 禁用会话缓存
telemetry=False, # 禁用遥测
parallel=True, # 启用并行执行
max_workers=10 # 最大工作线程数
)
```
## 🔍 监控与调试
### 可观测性集成
AgentOS 支持多种可观测性平台集成:
#### AgentOps 集成
```python
import agentops
from agno.agent import Agent
# 初始化 AgentOps
agentops.init(
api_key="your-agentops-api-key",
tags=["production", "agentos"]
)
# 创建被监控的代理
monitored_agent = Agent(
name="Monitored Agent",
model=OpenAIChat(id="gpt-4o"),
monitoring=True # 启用监控
)
```
#### Langfuse 集成
```python
from langfuse import Langfuse
from agno.integrations.langfuse import LangfuseIntegration
# Langfuse 配置
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com"
)
# 集成到 AgentOS
langfuse_integration = LangfuseIntegration(
langfuse=langfuse,
trace_name="agentos-trace",
tags=["production"]
)
agent_os = AgentOS(
description="Langfuse Monitored AgentOS",
agents=[agent],
integrations=[langfuse_integration]
)
```
#### 自定义监控指标
```python
from agno.metrics import MetricsCollector
# 自定义指标收集器
metrics = MetricsCollector(
name="custom_metrics",
endpoint="http://metrics-server:9090/metrics"
)
# 代理性能指标
@metrics.counter
async def agent_response_time(agent_name: str, duration: float):
return {"agent": agent_name, "duration": duration}
@metrics.gauge
def active_sessions():
return get_active_session_count()
# 集成到代理
monitored_agent = Agent(
name="Metrics Enabled Agent",
metrics=metrics
)
```
### 日志管理
#### 结构化日志配置
```python
import logging
from pythonjsonlogger import jsonlogger
# JSON 日志格式
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
# 根日志器配置
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logHandler)
# AgentOS 特定日志
agentos_logger = logging.getLogger("agno")
agentos_logger.setLevel(logging.DEBUG)
# 文件日志
file_handler = logging.FileHandler("agentos.log")
file_handler.setFormatter(formatter)
agentos_logger.addHandler(file_handler)
```
#### 日志采样策略
```python
# 生产环境日志采样
class SamplingFilter(logging.Filter):
def __init__(self, sample_rate=0.1):
self.sample_rate = sample_rate
self.counter = 0
def filter(self, record):
self.counter += 1
return self.counter % int(1/self.sample_rate) == 0
# 应用采样过滤器
sampling_filter = SamplingFilter(sample_rate=0.01) # 1% 采样率
agentos_logger.addFilter(sampling_filter)
```
### 健康检查与告警
#### 健康检查端点
```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class HealthStatus(BaseModel):
status: str
timestamp: str
checks: dict
@app.get("/health", response_model=HealthStatus)
async def health_check():
"""综合健康检查"""
checks = {
"database": await check_database_health(),
"redis": await check_redis_health(),
"model_api": await check_model_api_health(),
"memory_usage": check_memory_usage(),
"disk_space": check_disk_space()
}
overall_status = "healthy" if all(checks.values()) else "unhealthy"
return HealthStatus(
status=overall_status,
timestamp=datetime.utcnow().isoformat(),
checks=checks
)
@app.get("/ready")
async def readiness_check():
"""就绪检查 - 用于 Kubernetes"""
if not await is_system_ready():
raise HTTPException(status_code=503, detail="System not ready")
return {"status": "ready"}
@app.get("/live")
async def liveness_check():
"""存活检查 - 用于 Kubernetes"""
if not await is_system_alive():
raise HTTPException(status_code=503, detail="System not alive")
return {"status": "alive"}
```
#### 告警系统集成
```python
from agno.alerts import AlertManager, AlertSeverity
# 告警管理器
alert_manager = AlertManager(
webhook_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
email_config={
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "alerts@yourcompany.com",
"password": "your-app-password"
}
)
# 定义告警规则
@alert_manager.alert(
name="high_error_rate",
condition=lambda: get_error_rate() > 0.05, # 5% 错误率
severity=AlertSeverity.CRITICAL,
message="错误率超过 5% 阈值"
)
@alert_manager.alert(
name="high_response_time",
condition=lambda: get_avg_response_time() > 5000, # 5秒
severity=AlertSeverity.WARNING,
message="平均响应时间超过 5 秒"
)
@alert_manager.alert(
name="database_connection_loss",
condition=lambda: not is_database_connected(),
severity=AlertSeverity.CRITICAL,
message="数据库连接丢失"
)
# 启动告警监控
alert_manager.start_monitoring(interval=60) # 每分钟检查一次
```
## ⚠️ 故障排查指南
### 常见问题与解决方案
#### 数据库连接问题
**问题**: `psycopg2.OperationalError: could not connect to server`
**原因分析**:
- PostgreSQL 服务未启动
- 连接字符串配置错误
- 网络连接问题
- 认证失败
**解决方案**:
```bash
# 检查 PostgreSQL 状态
docker ps | grep postgres
# 查看容器日志
docker logs pgvector
# 测试连接
docker exec -it pgvector psql -U ai -d ai -c "SELECT version();"
# 网络连通性测试
telnet localhost 5532
```
**预防措施**:
```python
# 连接重试机制
db = PostgresDb(
id="resilient-db",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
pool_pre_ping=True, # 连接前检查
pool_recycle=3600, # 定期回收连接
connect_args={
"connect_timeout": 10, # 连接超时
"application_name": "agentos" # 应用标识
}
)
```
#### 模型 API 问题
**问题**: `openai.RateLimitError: Rate limit reached`
**原因分析**:
- API 调用频率过高
- 并发请求过多
- 配额不足
**解决方案**:
```python
# 实现指数退避重试
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def call_openai_api(messages):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1000
)
return response
except openai.RateLimitError as e:
logger.warning(f"Rate limit hit, retrying: {e}")
raise
# 请求限流
from asyncio import Semaphore
rate_limit_semaphore = Semaphore(10) # 最多10个并发请求
async def limited_api_call(messages):
async with rate_limit_semaphore:
return await call_openai_api(messages)
```
#### 内存泄漏问题
**问题**: 内存使用量持续增长
**诊断方法**:
```python
import tracemalloc
import gc
# 启用内存跟踪
tracemalloc.start()
# 定期快照
def memory_snapshot():
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 memory consumers ]")
for stat in top_stats[:10]:
print(stat)
return snapshot
# 内存泄漏检测
class MemoryMonitor:
def __init__(self):
self.baseline = None
self.snapshots = []
def take_baseline(self):
gc.collect()
self.baseline = tracemalloc.take_snapshot()
def check_leaks(self):
if not self.baseline:
return
gc.collect()
current = tracemalloc.take_snapshot()
# 比较内存使用
stats = current.compare_to(self.baseline, 'lineno')
print("\n[ Memory Leak Detection ]")
for stat in stats[:10]:
if stat.size_diff > 1024 * 1024: # 1MB 增长
print(f"POTENTIAL LEAK: {stat}")
```
#### 会话状态丢失
**问题**: 会话历史无法正确加载
**诊断步骤**:
```python
# 会话调试工具
class SessionDebugger:
def __init__(self, db):
self.db = db
def check_session_integrity(self, session_id):
"""检查会话完整性"""
# 检查会话表
session = self.db.get_session(session_id)
if not session:
print(f"❌ Session {session_id} not found")
return False
# 检查相关记忆
memories = self.db.get_session_memories(session_id)
print(f"📋 Found {len(memories)} memories for session {session_id}")
# 检查用户记忆
user_memories = self.db.get_user_memories(session.user_id)
print(f"👤 Found {len(user_memories)} user memories")
# 检查会话摘要
summaries = self.db.get_session_summaries(session_id)
print(f"📝 Found {len(summaries)} session summaries")
return True
def repair_session(self, session_id):
"""修复损坏的会话"""
try:
# 重新创建会话索引
self.db.rebuild_session_index(session_id)
# 重建记忆关联
self.db.rebuild_memory_links(session_id)
print(f"✅ Session {session_id} repaired successfully")
return True
except Exception as e:
print(f"❌ Failed to repair session {session_id}: {e}")
return False
```
### 性能调优工具
#### 性能分析器
```python
import cProfile
import pstats
from io import StringIO
class PerformanceProfiler:
def __init__(self):
self.profiler = cProfile.Profile()
def start_profiling(self):
"""开始性能分析"""
self.profiler.enable()
print("🔍 Performance profiling started")
def stop_profiling(self):
"""停止性能分析并生成报告"""
self.profiler.disable()
# 创建统计报告
stream = StringIO()
stats = pstats.Stats(self.profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # 显示前20个函数
print("\n[ Performance Profile Report ]")
print(stream.getvalue())
# 保存到文件
stats.dump_stats('agentos_profile.prof')
print("📊 Profile saved to agentos_profile.prof")
def profile_agent_operation(self, agent, operation, *args, **kwargs):
"""分析特定代理操作"""
self.start_profiling()
try:
result = operation(*args, **kwargs)
return result
finally:
self.stop_profiling()
```
#### 数据库查询优化
```python
# 查询性能分析
class QueryOptimizer:
def __init__(self, db):
self.db = db
def analyze_slow_queries(self, threshold_ms=1000):
"""分析慢查询"""
slow_queries = self.db.execute("""
SELECT
query,
execution_time,
execution_count,
avg_time
FROM pg_stat_statements
WHERE mean_exec_time > %s
ORDER BY mean_exec_time DESC
LIMIT 10
""", [threshold_ms])
print(f"\n[ Slow Queries (> {threshold_ms}ms) ]")
for query in slow_queries:
print(f"⏱️ {query['avg_time']:.2f}ms avg, "
f"{query['execution_count']} calls")
print(f"📋 {query['query'][:200]}...")
return slow_queries
def optimize_memory_queries(self):
"""优化记忆查询"""
# 创建复合索引
self.db.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_session_memories_lookup
ON session_memories (session_id, created_at DESC)
""")
# 创建用户记忆索引
self.db.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_user_memories_lookup
ON user_memories (user_id, created_at DESC)
""")
print("✅ Memory query indexes created")
```
### 安全最佳实践
#### API 密钥管理
```python
# 安全的密钥管理
from cryptography.fernet import Fernet
import os
class SecureKeyManager:
def __init__(self):
# 从环境变量获取主密钥
self.master_key = os.environ.get('MASTER_KEY')
if not self.master_key:
# 生成新密钥(仅首次运行)
self.master_key = Fernet.generate_key()
print(f"🔑 Generated master key: {self.master_key.decode()}")
print("⚠️ SAVE THIS KEY SECURELY!")
self.cipher = Fernet(self.master_key)
def encrypt_api_key(self, api_key: str) -> str:
"""加密 API 密钥"""
encrypted = self.cipher.encrypt(api_key.encode())
return encrypted.decode()
def decrypt_api_key(self, encrypted_key: str) -> str:
"""解密 API 密钥"""
decrypted = self.cipher.decrypt(encrypted_key.encode())
return decrypted.decode()
def store_api_key_securely(self, service: str, api_key: str):
"""安全存储 API 密钥"""
encrypted_key = self.encrypt_api_key(api_key)
# 存储到环境变量或安全存储
os.environ[f'{service.upper()}_API_KEY_ENCRYPTED'] = encrypted_key
# 或者存储到加密的配置文件
with open(f'{service}_key.enc', 'w') as f:
f.write(encrypted_key)
def get_api_key(self, service: str) -> str:
"""获取解密的 API 密钥"""
# 从环境变量获取
encrypted_key = os.environ.get(f'{service.upper()}_API_KEY_ENCRYPTED')
if not encrypted_key:
# 从文件读取
try:
with open(f'{service}_key.enc', 'r') as f:
encrypted_key = f.read()
except FileNotFoundError:
raise ValueError(f"No encrypted key found for {service}")
return self.decrypt_api_key(encrypted_key)
```
#### 输入验证与清理
```python
import re
from html import escape
class SecurityValidator:
def __init__(self):
# 定义危险字符模式
self.dangerous_patterns = [
r'<script.*?>.*?</script>', # XSS 脚本
r'javascript:', # JavaScript 协议
r'on\w+\s*=\s*["\'].*?["\']', # 事件处理器
r'\.\.\/|\.\\', # 路径遍历
r'union\s+select|drop\s+table', # SQL 注入
r'\$\{.*?\}|\{\{.*?\}\}', # 模板注入
]
def sanitize_input(self, user_input: str) -> str:
"""清理用户输入"""
if not isinstance(user_input, str):
return str(user_input)
# HTML 转义
sanitized = escape(user_input)
# 移除危险模式
for pattern in self.dangerous_patterns:
sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)
# 长度限制
max_length = 10000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
return sanitized.strip()
def validate_prompt(self, prompt: str) -> bool:
"""验证提示词安全性"""
# 检查是否包含恶意内容
malicious_keywords = [
'ignore previous instructions',
'disregard safety guidelines',
'act as a different AI',
'bypass security',
'system prompt'
]
prompt_lower = prompt.lower()
for keyword in malicious_keywords:
if keyword in prompt_lower:
return False
return True
# 集成到代理
security_validator = SecurityValidator()
secure_agent = Agent(
name="Secure Agent",
input_validator=security_validator.sanitize_input,
prompt_validator=security_validator.validate_prompt
)
```
## 📚 最佳实践总结
### 架构设计原则
1. **模块化设计**: 将功能分解为独立的模块,便于维护和扩展
2. **松耦合**: 减少组件间的依赖,提高系统灵活性
3. **可扩展性**: 设计时考虑未来的功能扩展需求
4. **容错性**: 实现错误处理和恢复机制
5. **可观测性**: 集成监控和日志系统
### 性能优化建议
1. **数据库优化**:
- 创建合适的索引
- 使用连接池
- 优化查询语句
- 定期维护数据库
2. **缓存策略**:
- 实现多级缓存
- 使用 Redis 缓存热点数据
- 缓存模型响应结果
- 实现缓存失效策略
3. **并发处理**:
- 使用异步编程
- 合理设置线程池大小
- 实现请求限流
- 避免资源竞争
4. **资源管理**:
- 及时释放资源
- 监控内存使用
- 实现连接复用
- 优化数据传输
### 安全最佳实践
1. **认证授权**:
- 实现强身份认证
- 使用 JWT Token
- 定期轮换密钥
- 实施最小权限原则
2. **数据保护**:
- 加密敏感数据
- 安全传输协议
- 定期备份数据
- 实施数据脱敏
3. **输入验证**:
- 严格验证用户输入
- 防止注入攻击
- 限制输入长度
- 实施内容过滤
4. **监控审计**:
- 记录操作日志
- 监控异常行为
- 实施访问审计
- 定期安全评估
### 运维建议
1. **部署策略**:
- 使用容器化部署
- 实现蓝绿部署
- 配置健康检查
- 实施回滚机制
2. **监控告警**:
- 设置关键指标监控
- 配置多级告警
- 实现自动恢复
- 定期演练故障
3. **备份恢复**:
- 定期备份数据
- 测试恢复流程
- 实施异地备份
- 制定应急预案
4. **文档维护**:
- 保持文档更新
- 记录架构变更
- 维护操作手册
- 培训团队成员
## 🎯 总结与展望
AgentOS 作为一个企业级的多代理操作系统,提供了完整的基础设施来构建、部署和管理智能代理应用。通过其强大的记忆系统、灵活的工作流引擎、丰富的工具集成和多接口支持,开发者可以快速构建复杂的 AI 应用系统。
### 核心优势
1. **完整生态**: 从开发到部署的完整工具链
2. **企业级**: 支持生产环境的高可用部署
3. **可扩展**: 模块化架构支持灵活扩展
4. **可观测**: 全面的监控和日志系统
5. **安全**: 多层次的安全防护机制
### 发展趋势
随着 AI 技术的快速发展,AgentOS 将继续演进,支持更多先进的 AI 模型、更智能的协作机制和更强大的部署选项。未来版本将重点关注:
- **多模态支持**: 集成图像、音频、视频处理能力
- **边缘计算**: 支持边缘部署和离线运行
- **联邦学习**: 实现分布式机器学习
- **自治管理**: 实现系统的自我管理和优化
通过深入理解和应用 AgentOS,开发者可以构建出更加智能、高效和可靠的企业级 AI 应用系统。
登录后可参与表态
讨论回复
0 条回复还没有人回复,快来发表你的看法吧!