```python
import os
import asyncio
import json
import logging
from datetime import datetime
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from pathlib import Path
from agno.agent import Agent
from agno.models.deepseek import DeepSeek
from agno.models.openai import OpenAIChat
from agno.tools.yfinance import YFinanceTools
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.python import PythonTools
from agno.tools.file import FileTools
from agno.memory import MemoryManager
from agno.db.sqlite.sqlite import SqliteDb
# from agno.knowledge import PDFKnowledgeBase, WebsiteKnowledgeBase
# from agno.vectordb import PgVectorDb, LanceDb
# from agno.embedder import OpenAIEmbedder
from agno.workflow import Workflow
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agno2.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class AgentConfig:
"""代理配置类"""
name: str
model_type: str = "deepseek-chat" # deepseek, openai
temperature: float = 0.1
max_tokens: int = 4000
enable_tools: bool = True
enable_memory: bool = True
enable_storage: bool = True
markdown: bool = True
telemetry: bool = False
debug: bool = False
class AgnoAgentManager:
"""Agno 代理管理器"""
def __init__(self, config_path: str = "agent_config.json"):
self.config_path = config_path
self.agents: Dict[str, Agent] = {}
self.memories: Dict[str, MemoryManager] = {}
self.storages: Dict[str, SqliteDb] = {}
self.configs: Dict[str, AgentConfig] = {}
self._load_config()
def _load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
for name, config in config_data.items():
self.configs[name] = AgentConfig(**config)
else:
# 默认配置
self.configs = {
"financial_analyst": AgentConfig(
name="financial_analyst",
model_type="deepseek",
temperature=0.1,
enable_tools=True,
enable_memory=True
),
"research_assistant": AgentConfig(
name="research_assistant",
model_type="deepseek",
temperature=0.3,
enable_tools=True,
enable_memory=True
),
"data_analyst": AgentConfig(
name="data_analyst",
model_type="deepseek",
temperature=0.2,
enable_tools=True,
enable_memory=True
)
}
self._save_config()
def _save_config(self):
"""保存配置文件"""
config_data = {}
for name, config in self.configs.items():
config_data[name] = {
"name": config.name,
"model_type": config.model_type,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
"enable_tools": config.enable_tools,
"enable_memory": config.enable_memory,
"enable_storage": config.enable_storage,
"markdown": config.markdown,
"telemetry": config.telemetry,
"debug": config.debug
}
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=2, ensure_ascii=False)
def _create_model(self, config: AgentConfig):
"""创建模型实例"""
if config.model_type == "deepseek":
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
raise ValueError("DEEPSEEK_API_KEY environment variable is not set")
return DeepSeek(
id="deepseek-chat",
api_key=api_key,
base_url="https://api.deepseek.com",
temperature=config.temperature,
max_tokens=config.max_tokens
)
elif config.model_type == "openai":
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is not set")
return OpenAIChat(
id="gpt-4",
api_key=api_key,
temperature=config.temperature,
max_tokens=config.max_tokens
)
else:
raise ValueError(f"Unsupported model type: {config.model_type}")
def _create_tools(self, config: AgentConfig) -> List:
"""创建工具列表"""
if not config.enable_tools:
return []
tools = []
# 根据代理类型添加不同的工具
if "financial" in config.name.lower():
# YFinanceTools 现在是一个包含多个方法的 Toolkit
# include_tools 参数可以指定哪些方法可用
tools.extend([
YFinanceTools(
cache_results=True,
include_tools=[
'get_current_stock_price',
'get_historical_stock_prices',
'get_company_info',
'get_company_news',
'get_analyst_recommendations'
]
)
])
if "research" in config.name.lower():
# DuckDuckGoTools 也应该检查其 API
tools.extend([
DuckDuckGoTools()
])
if "data" in config.name.lower():
tools.extend([
PythonTools(),
FileTools()
])
return tools
def _create_memory(self, config: AgentConfig) -> Optional[MemoryManager]:
"""创建记忆实例"""
if not config.enable_memory:
return None
memory_key = f"{config.name}_memory"
if memory_key not in self.memories:
model = self._create_model(config)
self.memories[memory_key] = MemoryManager(
model=model,
db=self._create_storage(config) if config.enable_storage else None,
debug_mode=config.debug
)
return self.memories[memory_key]
def _create_storage(self, config: AgentConfig) -> Optional[SqliteDb]:
"""创建存储实例"""
if not config.enable_storage:
return None
storage_key = f"{config.name}_storage"
if storage_key not in self.storages:
db_path = f"data/{config.name}_memory.db"
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.storages[storage_key] = SqliteDb(db_file=db_path)
return self.storages[storage_key]
def create_agent(self, name: str, custom_config: Optional[Dict] = None) -> Agent:
"""创建代理实例"""
if name not in self.configs and not custom_config:
raise ValueError(f"Agent '{name}' not found in configuration")
# 使用自定义配置或现有配置
if custom_config:
config = AgentConfig(name=name, **custom_config)
else:
config = self.configs[name]
# 创建模型
model = self._create_model(config)
# 创建工具
tools = self._create_tools(config)
# 创建记忆
memory_manager = self._create_memory(config)
# 创建代理
agent = Agent(
name=name,
model=model,
tools=tools,
memory_manager=memory_manager,
markdown=config.markdown,
telemetry=config.telemetry,
debug_mode=config.debug,
description=f"{name.replace('_', ' ').title()} agent"
)
self.agents[name] = agent
logger.info(f"Created agent: {name}")
return agent
def get_agent(self, name: str) -> Optional[Agent]:
"""获取代理实例"""
return self.agents.get(name)
def list_agents(self) -> List[str]:
"""列出所有代理名称"""
return list(self.agents.keys())
def remove_agent(self, name: str):
"""移除代理实例"""
if name in self.agents:
del self.agents[name]
logger.info(f"Removed agent: {name}")
async def run_agent_async(self, name: str, query: str, **kwargs) -> str:
"""异步运行代理"""
agent = self.get_agent(name)
if not agent:
raise ValueError(f"Agent '{name}' not found")
try:
response = await agent.arun(query, **kwargs)
logger.info(f"Agent {name} completed task")
return response.content
except Exception as e:
logger.error(f"Error running agent {name}: {str(e)}")
raise
def run_agent(self, name: str, query: str, **kwargs) -> str:
"""同步运行代理"""
agent = self.get_agent(name)
if not agent:
raise ValueError(f"Agent '{name}' not found")
try:
response = agent.run(query, **kwargs)
logger.info(f"Agent {name} completed task")
return response.content
except Exception as e:
logger.error(f"Error running agent {name}: {str(e)}")
raise
def create_workflow(self, name: str, steps: List[Dict]) -> Workflow:
"""创建工作流"""
workflow = Workflow(name=name)
for i, step in enumerate(steps):
step_name = step.get("name", f"step_{i+1}")
agent_name = step["agent"]
query = step["query"]
workflow.add_agent_task(
name=step_name,
agent=self.get_agent(agent_name),
query=query
)
logger.info(f"Created workflow: {name}")
return workflow
# 示例用法和测试函数
class AgnoDemo:
"""演示类"""
def __init__(self):
self.manager = AgnoAgentManager()
def setup_agents(self):
"""设置演示代理"""
# 创建金融分析师代理
financial_agent = self.manager.create_agent("financial_analyst")
# 创建研究助理代理
research_agent = self.manager.create_agent("research_assistant")
# 创建数据分析师代理
data_agent = self.manager.create_agent("data_analyst")
return financial_agent, research_agent, data_agent
def demo_financial_analysis(self):
"""演示金融分析"""
print("=== 金融分析演示 ===")
queries = [
"分析苹果公司(AAPL)最近一年的股价表现,包括关键技术指标",
"比较特斯拉(TSLA)和蔚来(NIO)的股票表现,哪个更值得投资?",
"获取微软(MSFT)的最新财报数据和分析师评级"
]
for query in queries:
print(f"\n查询: {query}")
try:
result = self.manager.run_agent("financial_analyst", query)
print(f"结果: {result[:500]}...") # 只显示前500字符
except Exception as e:
print(f"错误: {str(e)}")
def demo_research_assistant(self):
"""演示研究助理"""
print("\n=== 研究助理演示 ===")
queries = [
"搜索2024年最新的人工智能发展趋势",
"查找关于量子计算的最新研究进展",
"搜索可持续能源技术的最新突破"
]
for query in queries:
print(f"\n查询: {query}")
try:
result = self.manager.run_agent("research_assistant", query)
print(f"结果: {result[:500]}...")
except Exception as e:
print(f"错误: {str(e)}")
def demo_data_analysis(self):
"""演示数据分析"""
print("\n=== 数据分析演示 ===")
queries = [
"创建一个Python脚本来分析CSV文件中的销售数据",
"生成一个数据可视化脚本来展示月度销售趋势",
"编写一个函数来计算移动平均值"
]
for query in queries:
print(f"\n查询: {query}")
try:
result = self.manager.run_agent("data_analyst", query)
print(f"结果: {result[:500]}...")
except Exception as e:
print(f"错误: {str(e)}")
async def demo_workflow(self):
"""演示工作流"""
print("\n=== 工作流演示 ===")
# 创建一个研究->分析->报告的工作流
workflow_steps = [
{
"name": "research",
"agent": "research_assistant",
"query": "搜索特斯拉公司2024年的最新发展和市场表现"
},
{
"name": "financial_analysis",
"agent": "financial_analyst",
"query": "基于研究数据,分析特斯拉股票的投资价值"
},
{
"name": "report_generation",
"agent": "data_analyst",
"query": "创建一个综合报告,总结特斯拉的研究和财务分析结果"
}
]
workflow = self.manager.create_workflow("tesla_analysis", workflow_steps)
try:
result = await workflow.arun()
print(f"工作流结果: {result}")
except Exception as e:
print(f"工作流错误: {str(e)}")
def run_all_demos(self):
"""运行所有演示"""
print("开始 Agno2 全面演示...")
# 设置代理
self.setup_agents()
# 运行各个演示
self.demo_financial_analysis()
self.demo_research_assistant()
self.demo_data_analysis()
# 异步运行工作流演示
print("\n运行异步工作流演示...")
asyncio.run(self.demo_workflow())
print("\n=== 演示完成 ===")
print(f"可用代理: {self.manager.list_agents()}")
# 主函数
async def main():
"""主函数"""
# 检查环境变量
required_keys = ["DEEPSEEK_API_KEY"]
missing_keys = [key for key in required_keys if not os.getenv(key)]
if missing_keys:
print(f"错误: 缺少环境变量 {missing_keys}")
print("请设置以下环境变量:")
print(" DEEPSEEK_API_KEY: DeepSeek API 密钥")
print(" OPENAI_API_KEY: OpenAI API 密钥 (可选)")
return
# 创建数据目录
os.makedirs("data", exist_ok=True)
os.makedirs("logs", exist_ok=True)
# 运行演示
demo = AgnoDemo()
# 命令行参数处理
import sys
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "demo":
demo.run_all_demos()
elif command == "financial":
demo.setup_agents()
demo.demo_financial_analysis()
elif command == "research":
demo.setup_agents()
demo.demo_research_assistant()
elif command == "data":
demo.setup_agents()
demo.demo_data_analysis()
elif command == "workflow":
demo.setup_agents()
await demo.demo_workflow()
else:
print(f"未知命令: {command}")
print("可用命令: demo, financial, research, data, workflow")
else:
# 交互模式
demo.setup_agents()
print("\n=== Agno2 交互模式 ===")
print("可用代理:", demo.manager.list_agents())
print("输入格式: <代理名称> | <查询内容>")
print("输入 'quit' 退出")
while True:
try:
user_input = input("\n> ").strip()
if user_input.lower() == 'quit':
break
if '|' not in user_input:
print("格式错误。请使用: <代理名称> | <查询内容>")
continue
agent_name, query = user_input.split('|', 1)
agent_name = agent_name.strip()
query = query.strip()
if agent_name not in demo.manager.list_agents():
print(f"代理 '{agent_name}' 不存在")
continue
print(f"正在运行 {agent_name}...")
result = demo.manager.run_agent(agent_name, query)
print(f"结果:\n{result}")
except KeyboardInterrupt:
print("\n退出程序")
break
except Exception as e:
print(f"错误: {str(e)}")
if __name__ == "__main__":
# 运行主函数
asyncio.run(main())
```