模块2:智能体架构迁移方案 - 详细实现
📋 目录
1. 现状分析
2. Agno智能体架构设计
3. 具体智能体迁移实现
4. 智能体工厂和注册机制
5. 迁移挑战与解决方案
6. 迁移验证与测试
7. 迁移时间表和里程碑
---
1. 现状分析
1.1 当前LangGraph智能体架构
基于对现有代码的深度分析,当前TradingAgents-CN项目中的智能体架构如下:
#### 智能体类型和结构
分析师团队(4个):
- 基本面分析师 (
fundamentals_analyst.py): 分析公司财务数据和基本面指标
- 市场分析师 (
market_analyst.py): 技术分析和价格趋势分析
- 新闻分析师 (
news_analyst.py): 新闻事件和宏观影响分析
- 社交媒体分析师 (
social_media_analyst.py): 社交媒体情绪分析
研究员团队(2个):
- 看涨研究员 (
bull_researcher.py): 构建看涨投资论证
- 看跌研究员 (
bear_researcher.py): 构建看跌投资论证
管理层团队(3个):
- 研究经理 (
research_manager.py): 协调分析师和研究员工作
- 风险经理 (
risk_manager.py): 综合风险评估和最终决策
- 交易员 (
trader.py): 执行最终交易决策
风险分析团队(3个):
- 激进风险分析师 (
risky_risk_analyst.py): 激进风险评估
- 保守风险分析师 (
safe_risk_analyst.py): 保守风险评估
- 中性风险分析师 (
neutral_risk_analyst.py): 中性风险评估
#### 当前智能体实现模式
# 当前LangGraph智能体创建模式
def create_fundamentals_analyst(llm, toolkit):
@log_analyst_module("fundamentals")
def fundamentals_analyst_node(state):
# 1. 状态管理 - 从state获取输入参数
current_date = state["trade_date"]
ticker = state["company_of_interest"]
# 2. 工具调用 - 使用toolkit获取数据
tools = [toolkit.get_stock_fundamentals_unified]
# 3. 提示词构建 - 构建专业分析提示词
system_message = f"你是一位专业的股票基本面分析师..."
# 4. LLM调用 - 生成分析报告
response = llm.invoke(prompt)
# 5. 状态更新 - 返回更新后的状态
return {"fundamentals_report": response.content}
return fundamentals_analyst_node
#### 关键特性分析
1. 状态驱动: 所有智能体通过统一的AgentState进行状态管理
2. 工具集成: 每个智能体可以访问统一的工具包toolkit
3. 专业提示词: 针对不同类型的分析构建专业化提示词
4. 日志系统: 统一的日志记录和性能监控
5. 错误处理: 完善的异常处理和降级机制
1.2 当前架构的优势
1. 模块化设计: 每个智能体职责单一,易于维护和测试
2. 统一接口: 所有智能体遵循相同的创建和调用模式
3. 专业深度: 每个智能体都有深度的专业领域知识
4. 协作机制: 智能体之间通过状态共享实现有效协作
1.3 当前架构的挑战
1. 性能瓶颈: LangGraph的状态管理和图遍历存在性能开销
2. 复杂性高: 状态流转和条件逻辑复杂,调试困难
3. 扩展性差: 新增智能体需要修改图结构和条件边
4. 内存占用: 状态在图中传递时存在内存复制开销
---
2. Agno智能体架构设计
2.1 Agno智能体核心原理
基于搜索结果,Agno框架的核心特性:
1. 五级智能体层级: 从基础工具代理到高级团队协作代理
2. 去中心化执行: 无中心协调器,智能体间直接通信
3. 零拷贝数据管道: 状态数据在智能体间零拷贝传递
4. 共享内存: 智能体间共享内存和知识库
5. 事件驱动: 基于事件的异步通信机制
2.2 Agno智能体基类设计
from agno.agent import Agent
from agno.models.base import Model
from agno.tools.base import Tool
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
import asyncio
import json
class TradingAgent(BaseModel):
"""
TradingAgents-CN的Agno智能体基类
继承自Agno的Agent类,添加金融分析特定功能
"""
# 基础属性
name: str
agent_type: str
description: str
# LLM模型
model: Model
# 工具列表
tools: List[Tool] = []
# 记忆系统
memory: Optional[Any] = None
# 配置参数
config: Dict[str, Any] = {}
# Agno核心智能体
agno_agent: Optional[Agent] = None
class Config:
arbitrary_types_allowed = True
def __init__(self, **data):
super().__init__(**data)
self._initialize_agno_agent()
def _initialize_agno_agent(self):
"""初始化Agno智能体"""
self.agno_agent = Agent(
name=self.name,
model=self.model,
tools=self.tools,
description=self.description,
instructions=self._build_instructions(),
memory=self.memory,
show_tool_calls=True,
read_chat_history=True,
add_history_to_messages=True,
num_history_responses=5,
markdown=True,
debug_mode=True
)
def _build_instructions(self) -> str:
"""构建智能体指令 - 子类重写"""
return f"You are {self.name}. {self.description}"
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行分析 - 子类重写"""
raise NotImplementedError("子类必须实现analyze方法")
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词 - 子类重写"""
raise NotImplementedError("子类必须实现get_system_prompt方法")
def format_response(self, raw_response: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""格式化响应 - 子类重写"""
return {
"agent_name": self.name,
"agent_type": self.agent_type,
"analysis": raw_response,
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
2.3 Agno模型适配器
from agno.models.openai import OpenAIChat
from agno.models.anthropic import Claude
from agno.models.google import Gemini
from agno.models.deepseek import DeepSeek
from typing import Dict, Any
class AgnoModelAdapter:
"""Agno模型适配器 - 支持多供应商"""
MODEL_PROVIDERS = {
'openai': {
'class': OpenAIChat,
'models': ['gpt-4o', 'gpt-4o-mini', 'gpt-3.5-turbo']
},
'anthropic': {
'class': Claude,
'models': ['claude-3-5-sonnet-20241022', 'claude-3-haiku-20240307']
},
'google': {
'class': Gemini,
'models': ['gemini-2.5-pro', 'gemini-2.0-flash', 'gemini-1.5-pro']
},
'deepseek': {
'class': DeepSeek,
'models': ['deepseek-chat', 'deepseek-coder']
}
}
@classmethod
def create_model(cls, provider: str, model_name: str, api_key: str, **kwargs) -> Any:
"""创建Agno模型实例"""
if provider not in cls.MODEL_PROVIDERS:
raise ValueError(f"不支持的供应商: {provider}")
provider_config = cls.MODEL_PROVIDERS[provider]
if model_name not in provider_config['models']:
logger.warning(f"模型 {model_name} 可能不被官方支持")
model_class = provider_config['class']
# 创建模型实例
model_config = {
'id': model_name,
'api_key': api_key,
'temperature': kwargs.get('temperature', 0.1),
'max_tokens': kwargs.get('max_tokens', 4096),
'timeout': kwargs.get('timeout', 30),
}
# 添加供应商特定配置
if provider == 'openai':
model_config.update({
'frequency_penalty': kwargs.get('frequency_penalty', 0.0),
'presence_penalty': kwargs.get('presence_penalty', 0.0),
})
elif provider == 'anthropic':
model_config.update({
'max_tokens': kwargs.get('max_tokens', 8192),
})
elif provider == 'google':
model_config.update({
'safety_settings': kwargs.get('safety_settings', {}),
})
return model_class(**model_config)
---
3. 具体智能体迁移实现
3.1 基本面分析师迁移
#### 迁移分析
当前LangGraph实现特点:
1. 使用@log_analyst_module("fundamentals")装饰器进行日志记录
2. 通过状态获取股票代码和日期:state["company_of_interest"]、state["trade_date"]
3. 调用统一工具get_stock_fundamentals_unified获取数据
4. 构建专业化提示词,强制要求使用真实数据
5. 处理多市场(A股、港股、美股)的公司名称获取
迁移挑战:
1. 工具调用模式从LangChain工具转换为Agno工具
2. 状态管理从集中式状态转换为智能体间消息传递
3. 日志系统从装饰器转换为Agno内置日志
4. 多市场适配需要重新设计
#### Agno实现
from agno.tools.base import Tool
from typing import Dict, Any, List
import asyncio
from datetime import datetime, timedelta
class GetStockFundamentalsTool(Tool):
"""获取股票基本面数据工具"""
name: str = "get_stock_fundamentals_unified"
description: str = "获取股票基本面数据,包括财务指标和估值数据"
def run(self, ticker: str, start_date: str, end_date: str, curr_date: str) -> str:
"""执行工具调用"""
try:
# 调用现有的基本面数据获取逻辑
from tradingagents.dataflows.interface import get_stock_fundamentals_unified
result = get_stock_fundamentals_unified(
ticker=ticker,
start_date=start_date,
end_date=end_date,
curr_date=curr_date
)
return result if result else "无法获取基本面数据"
except Exception as e:
logger.error(f"获取基本面数据失败: {e}")
return f"获取基本面数据失败: {str(e)}"
class AgnoFundamentalsAnalyst(TradingAgent):
"""Agno基本面分析师"""
def __init__(self, model: Any, memory: Any = None, config: Dict[str, Any] = None):
super().__init__(
name="Fundamentals Analyst",
agent_type="fundamentals",
description="专业的股票基本面分析师,分析公司财务数据和基本面指标",
model=model,
tools=[GetStockFundamentalsTool()],
memory=memory,
config=config or {}
)
def _build_instructions(self) -> str:
"""构建基本面分析师指令"""
return """你是一位专业的股票基本面分析师。
⚠️ 绝对强制要求:你必须调用工具获取真实数据!不允许任何假设或编造!
📊 分析要求:
- 基于真实数据进行深度基本面分析
- 计算并提供合理价位区间
- 分析当前股价是否被低估或高估
- 提供基于基本面的目标价位建议
- 包含PE、PB、PEG等估值指标分析
- 结合市场特点进行分析
🌍 语言和货币要求:
- 所有分析内容必须使用中文
- 投资建议必须使用中文:买入、持有、卖出
- 绝对不允许使用英文:buy、hold、sell
🚫 严格禁止:
- 不允许说"我将调用工具"
- 不允许假设任何数据
- 不允许编造公司信息
请使用中文,基于真实数据进行分析。"""
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词"""
ticker = context.get("ticker", "")
current_date = context.get("current_date", datetime.now().strftime("%Y-%m-%d"))
market_info = context.get("market_info", {})
company_name = context.get("company_name", ticker)
# 计算数据范围
end_date_dt = datetime.strptime(current_date, "%Y-%m-%d")
start_date_dt = end_date_dt - timedelta(days=10)
start_date = start_date_dt.strftime("%Y-%m-%d")
return f"""你是一位专业的股票基本面分析师。
⚠️ 绝对强制要求:你必须调用工具获取真实数据!不允许任何假设或编造!
任务:分析{company_name}(股票代码:{ticker},{market_info.get('market_name', '未知市场')})
🔴 立即调用 get_stock_fundamentals_unified 工具
参数:ticker='{ticker}', start_date='{start_date}', end_date='{current_date}', curr_date='{current_date}'
📊 分析要求:
- 基于真实数据进行深度基本面分析
- 计算并提供合理价位区间(使用{market_info.get('currency_name', '人民币')})
- 分析当前股价是否被低估或高估
- 提供基于基本面的目标价位建议
- 包含PE、PB、PEG等估值指标分析
- 结合市场特点进行分析
🌍 语言和货币要求:
- 所有分析内容必须使用中文
- 投资建议必须使用中文:买入、持有、卖出
- 绝对不允许使用英文:buy、hold、sell
- 货币单位使用:{market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})
🚫 严格禁止:
- 不允许说"我将调用工具"
- 不允许假设任何数据
- 不允许编造公司信息
请使用中文,基于真实数据进行分析。"""
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行基本面分析"""
try:
ticker = context.get("ticker", "")
current_date = context.get("current_date", datetime.now().strftime("%Y-%m-%d"))
# 获取系统提示词
system_prompt = self.get_system_prompt(context)
# 构建用户消息
user_message = f"""
请对股票{ticker}进行基本面分析,分析日期为{current_date}。
请严格按照以下格式输出:
## 📊 股票基本信息
- 公司名称:[公司名称]
- 股票代码:{ticker}
- 所属市场:[市场名称]
## 💰 财务指标分析
[在这里分析PE、PB、ROE、ROA等关键财务指标]
## 📈 估值分析
[在这里分析当前估值水平,判断是否被低估或高估]
## 💭 投资建议
[在这里给出明确的投资建议:买入/持有/卖出]
## 🎯 目标价位
[在这里提供基于基本面的合理目标价位]
"""
# 调用Agno智能体
response = await self.agno_agent.arun(
message=user_message,
system_message=system_prompt
)
# 格式化响应
return self.format_response(response.content if hasattr(response, 'content') else str(response), context)
except Exception as e:
logger.error(f"基本面分析失败: {e}")
return self.format_response(f"基本面分析失败: {str(e)}", context)
3.2 市场分析师迁移
#### 迁移分析
当前LangGraph实现特点:
1. 使用技术指标分析(MACD、RSI、布林带等)
2. 支持多市场(A股、港股、美股)的技术分析
3. 工具调用计数器防止无限循环
4. 详细的输出格式要求
迁移挑战:
1. 技术指标计算工具的迁移
2. 图表分析功能的实现
3. 多市场数据适配
#### Agno实现
class GetStockMarketDataTool(Tool):
"""获取股票市场数据工具"""
name: str = "get_stock_market_data_unified"
description: str = "获取股票市场数据,包括价格、成交量和技术指标"
def run(self, ticker: str, start_date: str, end_date: str, lookback_days: int = 365) -> str:
"""执行工具调用"""
try:
from tradingagents.dataflows.interface import get_stock_market_data_unified
result = get_stock_market_data_unified(
ticker=ticker,
start_date=start_date,
end_date=end_date,
lookback_days=lookback_days
)
return result if result else "无法获取市场数据"
except Exception as e:
logger.error(f"获取市场数据失败: {e}")
return f"获取市场数据失败: {str(e)}"
class AgnoMarketAnalyst(TradingAgent):
"""Agno市场分析师"""
def __init__(self, model: Any, memory: Any = None, config: Dict[str, Any] = None):
super().__init__(
name="Market Analyst",
agent_type="market",
description="专业的股票市场技术分析师,分析价格趋势和技术指标",
model=model,
tools=[GetStockMarketDataTool()],
memory=memory,
config=config or {}
)
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词"""
ticker = context.get("ticker", "")
current_date = context.get("current_date", datetime.now().strftime("%Y-%m-%d"))
market_info = context.get("market_info", {})
company_name = context.get("company_name", ticker)
return f"""你是一位专业的股票技术分析师。
📋 **分析对象:**
- 公司名称:{company_name}
- 股票代码:{ticker}
- 所属市场:{market_info.get('market_name', '未知市场')}
- 计价货币:{market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})
- 分析日期:{current_date}
🔧 **工具使用:**
你可以使用 get_stock_market_data_unified 工具
参数:ticker='{ticker}', start_date='{current_date}', end_date='{current_date}'
📝 **输出格式要求(必须严格遵守):**
## 📊 股票基本信息
- 公司名称:{company_name}
- 股票代码:{ticker}
- 所属市场:{market_info.get('market_name', '未知市场')}
## 📈 技术指标分析
[在这里分析移动平均线、MACD、RSI、布林带等技术指标,提供具体数值]
## 📉 价格趋势分析
[在这里分析价格趋势,考虑{market_info.get('market_name', '市场')}特点]
## 💭 投资建议
[在这里给出明确的投资建议:买入/持有/卖出]
⚠️ **重要提醒:**
- 必须使用上述格式输出,不要自创标题格式
- 所有价格数据使用{market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})表示
- 确保在分析中正确使用公司名称"{company_name}"和股票代码"{ticker}"
- 如果你有明确的技术面投资建议(买入/持有/卖出),请在投资建议部分明确标注
- 不要使用'最终交易建议'前缀,因为最终决策需要综合所有分析师的意见
请使用中文,基于真实数据进行分析。"""
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行市场技术分析"""
try:
ticker = context.get("ticker", "")
current_date = context.get("current_date", datetime.now().strftime("%Y-%m-%d"))
system_prompt = self.get_system_prompt(context)
user_message = f"""
请对股票{ticker}进行技术分析,分析日期为{current_date}。
重点关注:
1. 价格走势和成交量变化
2. 移动平均线和技术指标(MACD、RSI、布林带)
3. 支撑阻力位分析
4. 短期和中期趋势判断
请提供具体的技术指标数值和明确的买卖建议。
"""
response = await self.agno_agent.arun(
message=user_message,
system_message=system_prompt
)
return self.format_response(response.content if hasattr(response, 'content') else str(response), context)
except Exception as e:
logger.error(f"技术分析失败: {e}")
return self.format_response(f"技术分析失败: {str(e)}", context)
3.3 研究员智能体迁移
#### 迁移分析
当前LangGraph实现特点:
1. 基于辩论机制的多轮对话
2. 使用记忆系统获取历史经验
3. 构建看涨/看跌投资论证
4. 响应其他研究员的观点
迁移挑战:
1. 辩论机制的异步实现
2. 记忆系统的集成
3. 多轮对话的状态管理
#### Agno实现
class AgnoBullResearcher(TradingAgent):
"""Agno看涨研究员"""
def __init__(self, model: Any, memory: Any = None, config: Dict[str, Any] = None):
super().__init__(
name="Bull Researcher",
agent_type="bull_researcher",
description="看涨研究员,负责构建积极的投资论证",
model=model,
tools=[], # 研究员主要进行论证分析,不直接调用数据工具
memory=memory,
config=config or {}
)
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词"""
ticker = context.get("ticker", "")
market_info = context.get("market_info", {})
company_name = context.get("company_name", ticker)
# 获取其他分析师的报告
market_report = context.get("market_report", "")
sentiment_report = context.get("sentiment_report", "")
news_report = context.get("news_report", "")
fundamentals_report = context.get("fundamentals_report", "")
# 获取辩论历史
debate_history = context.get("debate_history", "")
bear_argument = context.get("current_bear_argument", "")
return f"""你是一位看涨分析师,负责为股票 {company_name}(股票代码:{ticker})的投资建立强有力的论证。
⚠️ 重要提醒:当前分析的是 {'中国A股' if market_info.get('is_china') else '海外股票'},所有价格和估值请使用 {market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})作为单位。
⚠️ 在你的分析中,请始终使用公司名称"{company_name}"而不是股票代码"{ticker}"来称呼这家公司。
你的任务是构建基于证据的强有力案例,强调增长潜力、竞争优势和积极的市场指标。利用提供的研究和数据来解决担忧并有效反驳看跌论点。
请用中文回答,重点关注以下几个方面:
- 增长潜力:突出公司的市场机会、收入预测和可扩展性
- 竞争优势:强调独特产品、强势品牌或主导市场地位等因素
- 积极指标:使用财务健康状况、行业趋势和最新积极消息作为证据
- 反驳看跌观点:用具体数据和合理推理批判性分析看跌论点
- 参与讨论:以对话风格呈现你的论点,直接回应看跌分析师的观点
可用资源:
市场研究报告:{market_report}
社交媒体情绪报告:{sentiment_report}
最新世界事务新闻:{news_report}
公司基本面报告:{fundamentals_report}
辩论对话历史:{debate_history}
最后的看跌论点:{bear_argument}
请使用这些信息提供令人信服的看涨论点,反驳看跌担忧,并参与动态辩论,展示看涨立场的优势。
请确保所有回答都使用中文。"""
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行看涨分析"""
try:
system_prompt = self.get_system_prompt(context)
user_message = f"""
请基于提供的分析师报告和辩论历史,为{context.get('company_name', '公司')}构建强有力的看涨投资论证。
要求:
1. 重点强调增长潜力和竞争优势
2. 用具体数据反驳看跌观点
3. 提供令人信服的投资理由
4. 保持对话风格,直接回应对方观点
请提供详细和专业的看涨分析。
"""
response = await self.agno_agent.arun(
message=user_message,
system_message=system_prompt
)
result = {
"agent_name": self.name,
"agent_type": self.agent_type,
"argument": response.content if hasattr(response, 'content') else str(response),
"position": "bullish",
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
return result
except Exception as e:
logger.error(f"看涨分析失败: {e}")
return {
"agent_name": self.name,
"agent_type": self.agent_type,
"argument": f"看涨分析失败: {str(e)}",
"position": "bullish",
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
class AgnoBearResearcher(TradingAgent):
"""Agno看跌研究员"""
def __init__(self, model: Any, memory: Any = None, config: Dict[str, Any] = None):
super().__init__(
name="Bear Researcher",
agent_type="bear_researcher",
description="看跌研究员,负责识别投资风险和负面因素",
model=model,
tools=[],
memory=memory,
config=config or {}
)
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词"""
ticker = context.get("ticker", "")
market_info = context.get("market_info", {})
company_name = context.get("company_name", ticker)
# 获取其他分析师的报告
market_report = context.get("market_report", "")
sentiment_report = context.get("sentiment_report", "")
news_report = context.get("news_report", "")
fundamentals_report = context.get("fundamentals_report", "")
# 获取辩论历史
debate_history = context.get("debate_history", "")
bull_argument = context.get("current_bull_argument", "")
return f"""你是一位看跌分析师,负责为股票 {company_name}(股票代码:{ticker})识别投资风险和负面因素。
⚠️ 重要提醒:当前分析的是 {'中国A股' if market_info.get('is_china') else '海外股票'},所有价格和估值请使用 {market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})作为单位。
⚠️ 在你的分析中,请始终使用公司名称"{company_name}"而不是股票代码"{ticker}"来称呼这家公司。
你的任务是识别潜在风险、市场担忧和负面指标,为投资决策提供平衡的观点。
请用中文回答,重点关注以下几个方面:
- 风险识别:突出市场下行风险、行业挑战和公司特定风险
- 估值担忧:分析当前估值是否过高,是否存在泡沫
- 负面指标:使用财务弱点、负面新闻和市场担忧作为证据
- 反驳看涨观点:批判性分析看涨论点,指出其缺陷和过度乐观之处
- 风险量化:尽可能量化风险程度和潜在损失
可用资源:
市场研究报告:{market_report}
社交媒体情绪报告:{sentiment_report}
最新世界事务新闻:{news_report}
公司基本面报告:{fundamentals_report}
辩论对话历史:{debate_history}
最后的看涨论点:{bull_argument}
请使用这些信息提供谨慎的看跌分析,平衡投资组合风险。
请确保所有回答都使用中文。"""
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行看跌分析"""
try:
system_prompt = self.get_system_prompt(context)
user_message = f"""
请基于提供的分析师报告和辩论历史,为{context.get('company_name', '公司')}识别投资风险和负面因素。
要求:
1. 重点识别下行风险和估值担忧
2. 用具体数据反驳过度乐观的观点
3. 量化风险程度和潜在损失
4. 保持对话风格,直接回应对方观点
请提供详细和专业的风险分析。
"""
response = await self.agno_agent.arun(
message=user_message,
system_message=system_prompt
)
result = {
"agent_name": self.name,
"agent_type": self.agent_type,
"argument": response.content if hasattr(response, 'content') else str(response),
"position": "bearish",
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
return result
except Exception as e:
logger.error(f"看跌分析失败: {e}")
return {
"agent_name": self.name,
"agent_type": self.agent_type,
"argument": f"看跌分析失败: {str(e)}",
"position": "bearish",
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
3.4 交易员智能体迁移
#### 迁移分析
当前LangGraph实现特点:
1. 综合分析所有分析师的报告
2. 使用记忆系统避免重复错误
3. 提供具体的目标价位和买卖建议
4. 支持多市场的货币适配
迁移挑战:
1. 多源信息整合
2. 决策逻辑的标准化
3. 风险控制机制
#### Agno实现
class AgnoTrader(TradingAgent):
"""Agno交易员"""
def __init__(self, model: Any, memory: Any = None, config: Dict[str, Any] = None):
super().__init__(
name="Trader",
agent_type="trader",
description="专业交易员,基于综合分析做出最终投资决策",
model=model,
tools=[], # 交易员主要做决策,不直接调用数据工具
memory=memory,
config=config or {}
)
def get_system_prompt(self, context: Dict[str, Any]) -> str:
"""获取系统提示词"""
ticker = context.get("ticker", "")
market_info = context.get("market_info", {})
company_name = context.get("company_name", ticker)
# 获取所有分析师报告
market_report = context.get("market_report", "")
sentiment_report = context.get("sentiment_report", "")
news_report = context.get("news_report", "")
fundamentals_report = context.get("fundamentals_report", "")
# 获取研究员论证
bull_argument = context.get("bull_argument", "")
bear_argument = context.get("bear_argument", "")
# 获取风险分析
risk_analysis = context.get("risk_analysis", "")
# 获取历史记忆
past_memories = context.get("past_memories", "")
return f"""您是一位专业的交易员,负责分析市场数据并做出投资决策。基于您的分析,请提供具体的买入、卖出或持有建议。
⚠️ 重要提醒:当前分析的股票代码是 {ticker},请使用正确的货币单位:{market_info.get('currency_name', '人民币')}({market_info.get('currency_symbol', '¥')})
🔴 严格要求:
- 股票代码 {ticker} 的公司名称必须严格按照基本面报告中的真实数据
- 绝对禁止使用错误的公司名称或混淆不同的股票
- 所有分析必须基于提供的真实数据,不允许假设或编造
- **必须提供具体的目标价位,不允许设置为null或空值**
请在您的分析中包含以下关键信息:
1. **投资建议**: 明确的买入/持有/卖出决策
2. **目标价位**: 基于分析的合理目标价格({market_info.get('currency_name', '人民币')}) - 🚨 强制要求提供具体数值
- 买入建议:提供目标价位和预期涨幅
- 持有建议:提供合理价格区间(如:{market_info.get('currency_symbol', '¥')}XX-XX)
- 卖出建议:提供止损价位和目标卖出价
3. **置信度**: 对决策的信心程度(0-1之间)
4. **风险评分**: 投资风险等级(0-1之间,0为低风险,1为高风险)
5. **详细推理**: 支持决策的具体理由
🎯 目标价位计算指导:
- 基于基本面分析中的估值数据(P/E、P/B、DCF等)
- 参考技术分析的支撑位和阻力位
- 考虑行业平均估值水平
- 结合市场情绪和新闻影响
- 即使市场情绪过热,也要基于合理估值给出目标价
特别注意:
- 如果是中国A股(6位数字代码),请使用人民币(¥)作为价格单位
- 如果是美股或港股,请使用美元($)作为价格单位
- 目标价位必须与当前股价的货币单位保持一致
- 必须使用基本面报告中提供的正确公司名称
- **绝对不允许说"无法确定目标价"或"需要更多信息"**
可用分析资源:
市场研究报告:{market_report}
社交媒体情绪报告:{sentiment_report}
最新世界事务新闻:{news_report}
公司基本面报告:{fundamentals_report}
看涨论证:{bull_argument}
看跌论证:{bear_argument}
风险分析:{risk_analysis}
请用中文撰写分析内容,并始终以'最终交易建议: **买入/持有/卖出**'结束您的回应以确认您的建议。
请不要忘记利用过去决策的经验教训来避免重复错误。以下是类似情况下的交易反思和经验教训: {past_memories}"""
async def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行交易决策"""
try:
system_prompt = self.get_system_prompt(context)
user_message = f"""
请基于所有分析师的报告和论证,为{context.get('company_name', '公司')}做出最终的投资决策。
综合信息:
- 技术分析显示:{context.get('market_report', '暂无')[:200]}...
- 基本面分析显示:{context.get('fundamentals_report', '暂无')[:200]}...
- 看涨论证:{context.get('bull_argument', '暂无')[:200]}...
- 看跌论证:{context.get('bear_argument', '暂无')[:200]}...
- 风险分析:{context.get('risk_analysis', '暂无')[:200]}...
请提供:
1. 明确的投资建议(买入/持有/卖出)
2. 具体的目标价位和理由
3. 置信度和风险评分
4. 详细的决策推理过程
请确保决策平衡了收益潜力和风险控制。
"""
response = await self.agno_agent.arun(
message=user_message,
system_message=system_prompt
)
# 解析响应内容,提取关键信息
content = response.content if hasattr(response, 'content') else str(response)
# 提取投资建议
recommendation = self._extract_recommendation(content)
# 提取目标价位
target_price = self._extract_target_price(content)
# 提取置信度
confidence = self._extract_confidence(content)
# 提取风险评分
risk_score = self._extract_risk_score(content)
result = {
"agent_name": self.name,
"agent_type": self.agent_type,
"recommendation": recommendation,
"target_price": target_price,
"confidence": confidence,
"risk_score": risk_score,
"analysis": content,
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
return result
except Exception as e:
logger.error(f"交易决策失败: {e}")
return {
"agent_name": self.name,
"agent_type": self.agent_type,
"recommendation": "持有",
"target_price": "无法确定",
"confidence": 0.5,
"risk_score": 0.5,
"analysis": f"交易决策失败: {str(e)}",
"timestamp": asyncio.get_event_loop().time(),
"context": context
}
def _extract_recommendation(self, content: str) -> str:
"""从响应内容中提取投资建议"""
import re
# 查找明确的买卖建议
buy_patterns = [r'买入', r'建议买入', r'强烈推荐', r'积极买入']
sell_patterns = [r'卖出', r'建议卖出', r'减持', r'清仓']
hold_patterns = [r'持有', r'观望', r'中性', r'建议持有']
content_lower = content.lower()
for pattern in buy_patterns:
if re.search(pattern, content_lower):
return "买入"
for pattern in sell_patterns:
if re.search(pattern, content_lower):
return "卖出"
for pattern in hold_patterns:
if re.search(pattern, content_lower):
return "持有"
return "持有" # 默认建议
def _extract_target_price(self, content: str) -> str:
"""从响应内容中提取目标价位"""
import re
# 查找价格信息
price_patterns = [
r'目标价位[::]\s*([¥$]\d+(?:\.\d+)?)',
r'目标价格[::]\s*([¥$]\d+(?:\.\d+)?)',
r'合理价位[::]\s*([¥$]\d+(?:\.\d+)?)',
r'预期价位[::]\s*([¥$]\d+(?:\.\d+)?)'
]
for pattern in price_patterns:
match = re.search(pattern, content)
if match:
return match.group(1)
return "未明确给出"
def _extract_confidence(self, content: str) -> float:
"""从响应内容中提取置信度"""
import re
# 查找置信度信息
confidence_patterns = [
r'置信度[::]\s*(\d+(?:\.\d+)?)',
r'信心程度[::]\s*(\d+(?:\.\d+)?)',
r'把握程度[::]\s*(\d+(?:\.\d+)?)'
]
for pattern in confidence_patterns:
match = re.search(pattern, content)
if match:
try:
confidence = float(match.group(1))
return max(0.0, min(1.0, confidence)) # 确保在0-1范围内
except ValueError:
continue
return 0.7 # 默认置信度
def _extract_risk_score(self, content: str) -> float:
"""从响应内容中提取风险评分"""
import re
# 查找风险评分
risk_patterns = [
r'风险评分[::]\s*(\d+(?:\.\d+)?)',
r'风险等级[::]\s*(\d+(?:\.\d+)?)',
r'投资风险[::]\s*(\d+(?:\.\d+)?)'
]
for pattern in risk_patterns:
match = re.search(pattern, content)
if match:
try:
risk = float(match.group(1))
return max(0.0, min(1.0, risk)) # 确保在0-1范围内
except ValueError:
continue
return 0.5 # 默认风险评分
---
4. 智能体工厂和注册机制
4.1 智能体工厂实现
class AgnoAgentFactory:
"""Agno智能体工厂"""
# 智能体注册表
AGENT_REGISTRY = {
# 分析师团队
'fundamentals_analyst': AgnoFundamentalsAnalyst,
'market_analyst': AgnoMarketAnalyst,
'news_analyst': AgnoNewsAnalyst,
'social_media_analyst': AgnoSocialMediaAnalyst,
# 研究员团队
'bull_researcher': AgnoBullResearcher,
'bear_researcher': AgnoBearResearcher,
# 管理层团队
'trader': AgnoTrader,
'research_manager': AgnoResearchManager,
'risk_manager': AgnoRiskManager,
# 风险分析团队
'risky_risk_analyst': AgnoRiskyRiskAnalyst,
'safe_risk_analyst': AgnoSafeRiskAnalyst,
'neutral_risk_analyst': AgnoNeutralRiskAnalyst,
}
@classmethod
def create_agent(cls, agent_type: str, model: Any, memory: Any = None, config: Dict[str, Any] = None) -> TradingAgent:
"""创建智能体实例"""
if agent_type not in cls.AGENT_REGISTRY:
raise ValueError(f"不支持的智能体类型: {agent_type}")
agent_class = cls.AGENT_REGISTRY[agent_type]
# 处理特殊智能体(如风险评估需要额外参数)
if agent_type == 'risky_risk_analyst':
risk_type = config.pop('risk_type', 'aggressive') if config else 'aggressive'
return agent_class(model=model, memory=memory, config=config or {}, risk_type=risk_type)
elif agent_type == 'safe_risk_analyst':
risk_type = config.pop('risk_type', 'conservative') if config else 'conservative'
return agent_class(model=model, memory=memory, config=config or {}, risk_type=risk_type)
elif agent_type == 'neutral_risk_analyst':
risk_type = config.pop('risk_type', 'neutral') if config else 'neutral'
return agent_class(model=model, memory=memory, config=config or {}, risk_type=risk_type)
return agent_class(model=model, memory=memory, config=config or {})
@classmethod
def register_agent(cls, agent_type: str, agent_class: Type[TradingAgent]):
"""注册新的智能体类型"""
cls.AGENT_REGISTRY[agent_type] = agent_class
@classmethod
def get_supported_agents(cls) -> List[str]:
"""获取支持的智能体类型列表"""
return list(cls.AGENT_REGISTRY.keys())
@classmethod
def create_agent_from_config(cls, config: Dict[str, Any]) -> TradingAgent:
"""从配置创建智能体"""
agent_type = config.get('agent_type')
if not agent_type:
raise ValueError("配置中必须包含agent_type字段")
# 提取模型配置
model_config = config.get('model_config', {})
provider = model_config.get('provider', 'openai')
model_name = model_config.get('model_name', 'gpt-4o-mini')
api_key = model_config.get('api_key', '')
# 创建模型
model = AgnoModelAdapter.create_model(
provider=provider,
model_name=model_name,
api_key=api_key,
**model_config
)
# 提取内存配置
memory_config = config.get('memory_config', {})
memory = None
if memory_config.get('enabled', False):
# 创建内存实例
memory = cls._create_memory(memory_config)
# 提取智能体特定配置
agent_config = config.get('agent_config', {})
return cls.create_agent(
agent_type=agent_type,
model=model,
memory=memory,
config=agent_config
)
@staticmethod
def _create_memory(memory_config: Dict[str, Any]) -> Any:
"""创建内存实例"""
# 这里可以集成不同类型的内存系统
# 例如:向量数据库、知识图谱、简单缓存等
memory_type = memory_config.get('type', 'simple')
if memory_type == 'simple':
from tradingagents.memory.simple_memory import SimpleMemory
return SimpleMemory(**memory_config)
elif memory_type == 'vector':
from tradingagents.memory.vector_memory import VectorMemory
return VectorMemory(**memory_config)
else:
logger.warning(f"不支持的内存类型: {memory_type},使用简单内存")
from tradingagents.memory.simple_memory import SimpleMemory
return SimpleMemory(**memory_config)
4.2 智能体配置管理
class AgnoAgentConfig:
"""Agno智能体配置管理"""
# 默认配置模板
DEFAULT_CONFIGS = {
'fundamentals_analyst': {
'agent_type': 'fundamentals_analyst',
'model_config': {
'provider': 'openai',
'model_name': 'gpt-4o-mini',
'temperature': 0.1,
'max_tokens': 4096
},
'memory_config': {
'enabled': True,
'type': 'simple',
'max_memories': 100
},
'agent_config': {
'analysis_depth': 'detailed',
'valuation_models': ['pe', 'pb', 'peg', 'dcf']
}
},
'market_analyst': {
'agent_type': 'market_analyst',
'model_config': {
'provider': 'openai',
'model_name': 'gpt-4o-mini',
'temperature': 0.1,
'max_tokens': 4096
},
'memory_config': {
'enabled': True,
'type': 'simple',
'max_memories': 100
},
'agent_config': {
'technical_indicators': ['ma', 'macd', 'rsi', 'bollinger'],
'chart_analysis': True
}
},
'trader': {
'agent_type': 'trader',
'model_config': {
'provider': 'openai',
'model_name': 'gpt-4o',
'temperature': 0.1,
'max_tokens': 4096
},
'memory_config': {
'enabled': True,
'type': 'vector',
'max_memories': 200
},
'agent_config': {
'decision_criteria': ['technical', 'fundamental', 'sentiment', 'risk'],
'confidence_threshold': 0.7
}
}
}
@classmethod
def get_default_config(cls, agent_type: str) -> Dict[str, Any]:
"""获取默认配置"""
return cls.DEFAULT_CONFIGS.get(agent_type, cls.DEFAULT_CONFIGS['fundamentals_analyst'])
@classmethod
def validate_config(cls, config: Dict[str, Any]) -> bool:
"""验证配置有效性"""
required_fields = ['agent_type', 'model_config']
for field in required_fields:
if field not in config:
raise ValueError(f"配置缺少必需字段: {field}")
# 验证模型配置
model_config = config['model_config']
required_model_fields = ['provider', 'model_name']
for field in required_model_fields:
if field not in model_config:
raise ValueError(f"模型配置缺少必需字段: {field}")
return True
@classmethod
def merge_configs(cls, base_config: Dict[str, Any], override_config: Dict[str, Any]) -> Dict[str, Any]:
"""合并配置"""
import copy
result = copy.deepcopy(base_config)
for key, value in override_config.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = cls.merge_configs(result[key], value)
else:
result[key] = value
return result
---
5. 迁移挑战与解决方案
5.1 异步执行转换
#### 挑战
LangGraph主要使用同步执行模式,而Agno框架更倾向于异步执行。
#### 解决方案
import asyncio
from typing import Dict, Any, List
from concurrent.futures import ThreadPoolExecutor
class AsyncExecutionManager:
"""异步执行管理器"""
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.semaphore = asyncio.Semaphore(max_workers)
async def run_agent_analysis(self, agent: TradingAgent, context: Dict[str, Any]) -> Dict[str, Any]:
"""运行智能体分析"""
async with self.semaphore:
try:
# 设置超时
result = await asyncio.wait_for(
agent.analyze(context),
timeout=60 # 60秒超时
)
return result
except asyncio.TimeoutError:
logger.error(f"智能体 {agent.name} 分析超时")
return {
"agent_name": agent.name,
"agent_type": agent.agent_type,
"error": "分析超时",
"timestamp": asyncio.get_event_loop().time()
}
except Exception as e:
logger.error(f"智能体 {agent.name} 分析失败: {e}")
return {
"agent_name": agent.name,
"agent_type": agent.agent_type,
"error": str(e),
"timestamp": asyncio.get_event_loop().time()
}
async def run_parallel_analyses(self, agents: List[TradingAgent], context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""并行运行多个智能体分析"""
tasks = []
for agent in agents:
task = asyncio.create_task(
self.run_agent_analysis(agent, context)
)
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"智能体 {agents[i].name} 分析异常: {result}")
processed_results.append({
"agent_name": agents[i].name,
"agent_type": agents[i].agent_type,
"error": str(result),
"timestamp": asyncio.get_event_loop().time()
})
else:
processed_results.append(result)
return processed_results
async def run_sequential_analyses(self, agents: List[TradingAgent], context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""顺序运行多个智能体分析"""
results = []
for agent in agents:
try:
result = await self.run_agent_analysis(agent, context)
results.append(result)
# 更新上下文,将当前结果传递给下一个智能体
context[f"{agent.agent_type}_result"] = result
except Exception as e:
logger.error(f"顺序分析中智能体 {agent.name} 失败: {e}")
results.append({
"agent_name": agent.name,
"agent_type": agent.agent_type,
"error": str(e),
"timestamp": asyncio.get_event_loop().time()
})
return results
def close(self):
"""关闭执行器"""
self.executor.shutdown(wait=True)
5.2 工具调用差异
#### 挑战
LangChain工具和Agno工具在接口和调用方式上存在差异。
#### 解决方案
from agno.tools.base import Tool
from langchain.tools import BaseTool
from typing import Any, Optional, Type
from pydantic import BaseModel, Field
class LangChainToAgnoToolAdapter(Tool):
"""LangChain工具到Agno工具的适配器"""
def __init__(self, langchain_tool: BaseTool):
self.langchain_tool = langchain_tool
self.name = langchain_tool.name
self.description = langchain_tool.description
# 提取参数模式
if hasattr(langchain_tool, 'args_schema') and langchain_tool.args_schema:
self.args_schema = langchain_tool.args_schema
else:
# 创建默认的参数模式
self.args_schema = self._create_default_schema()
def _create_default_schema(self) -> Type[BaseModel]:
"""创建默认的参数模式"""
class DefaultToolInput(BaseModel):
input: str = Field(..., description="工具输入参数")
return DefaultToolInput
def run(self, **kwargs) -> str:
"""运行工具"""
try:
# 转换参数格式
if hasattr(self.langchain_tool, '_run'):
# LangChain工具通常有_run方法
result = self.langchain_tool._run(**kwargs)
else:
# 或者使用invoke方法
result = self.langchain_tool.invoke(kwargs)
# 转换结果格式
if isinstance(result, str):
return result
elif isinstance(result, dict):
return json.dumps(result, ensure_ascii=False)
else:
return str(result)
except Exception as e:
logger.error(f"工具调用失败: {e}")
return f"工具调用失败: {str(e)}"
async def arun(self, **kwargs) -> str:
"""异步运行工具"""
try:
# 尝试异步调用
if hasattr(self.langchain_tool, '_arun'):
result = await self.langchain_tool._arun(**kwargs)
elif hasattr(self.langchain_tool, 'ainvoke'):
result = await self.langchain_tool.ainvoke(kwargs)
else:
# 回退到同步调用
result = await asyncio.get_event_loop().run_in_executor(
None, self.run, **kwargs
)
# 转换结果格式
if isinstance(result, str):
return result
elif isinstance(result, dict):
return json.dumps(result, ensure_ascii=False)
else:
return str(result)
except Exception as e:
logger.error(f"异步工具调用失败: {e}")
return f"异步工具调用失败: {str(e)}"
class ToolMigrationManager:
"""工具迁移管理器"""
def __init__(self):
self.adapted_tools = {}
def adapt_langchain_tool(self, langchain_tool: BaseTool) -> LangChainToAgnoToolAdapter:
"""适配LangChain工具"""
adapter = LangChainToAgnoToolAdapter(langchain_tool)
self.adapted_tools[langchain_tool.name] = adapter
return adapter
def adapt_toolkit(self, toolkit) -> List[Tool]:
"""适配整个工具包"""
agno_tools = []
# 获取工具包中的所有工具
if hasattr(toolkit, 'get_tools'):
langchain_tools = toolkit.get_tools()
else:
# 尝试通过属性获取工具
langchain_tools = []
for attr_name in dir(toolkit):
attr = getattr(toolkit, attr_name)
if isinstance(attr, BaseTool):
langchain_tools.append(attr)
# 适配每个工具
for tool in langchain_tools:
try:
adapted_tool = self.adapt_langchain_tool(tool)
agno_tools.append(adapted_tool)
logger.info(f"成功适配工具: {tool.name}")
except Exception as e:
logger.error(f"适配工具 {tool.name} 失败: {e}")
return agno_tools
5.3 状态管理迁移
#### 挑战
从LangGraph的集中式状态管理转换到Agno的分布式状态管理。
#### 解决方案
from typing import Dict, Any, Optional, List
import asyncio
import json
from datetime import datetime
class DistributedStateManager:
"""分布式状态管理器"""
def __init__(self):
self.states = {}
self.state_history = []
self.locks = {}
async def create_state(self, state_id: str, initial_state: Dict[str, Any]) -> str:
"""创建新状态"""
self.states[state_id] = {
'data': initial_state.copy(),
'created_at': datetime.now(),
'updated_at': datetime.now(),
'version': 1
}
# 记录历史
self.state_history.append({
'state_id': state_id,
'action': 'create',
'data': initial_state,
'timestamp': datetime.now()
})
return state_id
async def get_state(self, state_id: str) -> Optional[Dict[str, Any]]:
"""获取状态"""
if state_id not in self.states:
return None
return self.states[state_id]['data'].copy()
async def update_state(self, state_id: str, updates: Dict[str, Any], agent_name: str = None) -> bool:
"""更新状态"""
if state_id not in self.states:
return False
# 获取锁
if state_id not in self.locks:
self.locks[state_id] = asyncio.Lock()
async with self.locks[state_id]:
# 更新状态数据
self.states[state_id]['data'].update(updates)
self.states[state_id]['updated_at'] = datetime.now()
self.states[state_id]['version'] += 1
# 记录历史
self.state_history.append({
'state_id': state_id,
'action': 'update',
'agent_name': agent_name,
'updates': updates,
'timestamp': datetime.now()
})
return True
async def delete_state(self, state_id: str) -> bool:
"""删除状态"""
if state_id not in self.states:
return False
# 获取锁
if state_id not in self.locks:
self.locks[state_id] = asyncio.Lock()
async with self.locks[state_id]:
# 记录历史
self.state_history.append({
'state_id': state_id,
'action': 'delete',
'timestamp': datetime.now()
})
# 删除状态
del self.states[state_id]
if state_id in self.locks:
del self.locks[state_id]
return True
async def get_state_diff(self, state_id: str, from_version: int, to_version: int = None) -> Dict[str, Any]:
"""获取状态差异"""
if state_id not in self.states:
return {}
# 过滤相关历史记录
relevant_history = [
h for h in self.state_history
if h['state_id'] == state_id and h['action'] == 'update'
]
if not to_version:
to_version = self.states[state_id]['version']
# 计算差异
diff = {}
for history_item in relevant_history:
if 'updates' in history_item:
diff.update(history_item['updates'])
return diff
def export_state_history(self, state_id: str = None) -> List[Dict[str, Any]]:
"""导出状态历史"""
if state_id:
return [
h for h in self.state_history
if h['state_id'] == state_id
]
else:
return self.state_history.copy()
class AgnoStateAdapter:
"""Agno状态适配器 - 适配LangGraph状态到Agno"""
def __init__(self, state_manager: DistributedStateManager):
self.state_manager = state_manager
async def adapt_langgraph_state(self, langgraph_state: Dict[str, Any]) -> str:
"""适配LangGraph状态到分布式状态"""
# 生成状态ID
import uuid
state_id = f"agno_state_{uuid.uuid4().hex}"
# 转换状态格式
agno_state = {
# 基本信息
'ticker': langgraph_state.get('company_of_interest', ''),
'current_date': langgraph_state.get('trade_date', ''),
'analysis_id': langgraph_state.get('analysis_id', ''),
# 分析师报告
'market_report': langgraph_state.get('market_report', ''),
'sentiment_report': langgraph_state.get('sentiment_report', ''),
'news_report': langgraph_state.get('news_report', ''),
'fundamentals_report': langgraph_state.get('fundamentals_report', ''),
# 研究员论证
'investment_debate_state': langgraph_state.get('investment_debate_state', {}),
'bull_argument': '',
'bear_argument': '',
# 风险分析
'risk_debate_state': langgraph_state.get('risk_debate_state', {}),
'risk_analysis': '',
# 交易决策
'trader_investment_plan': langgraph_state.get('trader_investment_plan', ''),
'final_trade_decision': langgraph_state.get('final_trade_decision', ''),
# 元数据
'created_at': datetime.now(),
'agent_sequence': [],
'performance_metrics': {}
}
# 创建分布式状态
await self.state_manager.create_state(state_id, agno_state)
return state_id
async def update_with_agent_result(self, state_id: str, agent_result: Dict[str, Any]) -> bool:
"""使用智能体结果更新状态"""
updates = {}
agent_type = agent_result.get('agent_type', '')
agent_name = agent_result.get('agent_name', '')
# 根据智能体类型更新相应字段
if agent_type == 'fundamentals':
updates['fundamentals_report'] = agent_result.get('analysis', '')
elif agent_type == 'market':
updates['market_report'] = agent_result.get('analysis', '')
elif agent_type == 'news':
updates['news_report'] = agent_result.get('analysis', '')
elif agent_type == 'social':
updates['sentiment_report'] = agent_result.get('analysis', '')
elif agent_type == 'bull_researcher':
updates['bull_argument'] = agent_result.get('argument', '')
# 更新辩论状态
debate_state = await self._get_debate_state(state_id)
debate_state['bull_history'] = debate_state.get('bull_history', '') + '\n' + agent_result.get('argument', '')
updates['investment_debate_state'] = debate_state
elif agent_type == 'bear_researcher':
updates['bear_argument'] = agent_result.get('argument', '')
# 更新辩论状态
debate_state = await self._get_debate_state(state_id)
debate_state['bear_history'] = debate_state.get('bear_history', '') + '\n' + agent_result.get('argument', '')
updates['investment_debate_state'] = debate_state
elif agent_type == 'trader':
updates['trader_investment_plan'] = agent_result.get('analysis', '')
updates['final_trade_decision'] = agent_result.get('recommendation', '')
elif agent_type in ['risky_risk_analyst', 'safe_risk_analyst', 'neutral_risk_analyst']:
# 合并风险分析
current_risk = await self._get_current_risk_analysis(state_id)
new_risk = agent_result.get('analysis', '')
updates['risk_analysis'] = current_risk + '\n' + new_risk
# 更新代理序列
current_state = await self.state_manager.get_state(state_id)
if current_state:
agent_sequence = current_state.get('agent_sequence', [])
agent_sequence.append({
'name': agent_name,
'type': agent_type,
'timestamp': agent_result.get('timestamp', datetime.now()),
'status': 'completed'
})
updates['agent_sequence'] = agent_sequence
# 更新性能指标
performance = current_state.get('performance_metrics', {})
if f'{agent_type}_time' not in performance:
performance[f'{agent_type}_time'] = []
performance[f'{agent_type}_time'].append({
'timestamp': datetime.now(),
'duration': agent_result.get('duration', 0)
})
updates['performance_metrics'] = performance
# 更新状态
return await self.state_manager.update_state(
state_id=state_id,
updates=updates,
agent_name=agent_name
)
async def _get_debate_state(self, state_id: str) -> Dict[str, Any]:
"""获取辩论状态"""
current_state = await self.state_manager.get_state(state_id)
return current_state.get('investment_debate_state', {}) if current_state else {}
async def _get_current_risk_analysis(self, state_id: str) -> str:
"""获取当前风险分析"""
current_state = await self.state_manager.get_state(state_id)
return current_state.get('risk_analysis', '') if current_state else ''
def convert_back_to_langgraph_format(self, agno_results: List[Dict[str, Any]], state_id: str) -> Dict[str, Any]:
"""转换回LangGraph格式"""
langgraph_state = {}
for result in agno_results:
agent_type = result.get('agent_type', '')
if agent_type == 'fundamentals':
langgraph_state['fundamentals_report'] = result.get('analysis', '')
elif agent_type == 'market':
langgraph_state['market_report'] = result.get('analysis', '')
elif agent_type == 'news':
langgraph_state['news_report'] = result.get('analysis', '')
elif agent_type == 'social':
langgraph_state['sentiment_report'] = result.get('analysis', '')
elif agent_type == 'bull_researcher':
langgraph_state['investment_debate_state'] = langgraph_state.get('investment_debate_state', {})
langgraph_state['investment_debate_state']['bull_argument'] = result.get('argument', '')
elif agent_type == 'bear_researcher':
langgraph_state['investment_debate_state'] = langgraph_state.get('investment_debate_state', {})
langgraph_state['investment_debate_state']['bear_argument'] = result.get('argument', '')
elif agent_type == 'trader':
langgraph_state['trader_investment_plan'] = result.get('analysis', '')
langgraph_state['final_trade_decision'] = result.get('recommendation', '')
elif agent_type in ['risky_risk_analyst', 'safe_risk_analyst', 'neutral_risk_analyst']:
langgraph_state['risk_debate_state'] = langgraph_state.get('risk_debate_state', {})
langgraph_state['risk_debate_state']['analysis'] = result.get('analysis', '')
return langgraph_state
### 5.4 性能优化
#### 挑战
Agno框架的性能优化和LangGraph存在差异。
#### 解决方案
python
import asyncio
import time
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from functools import wraps
@dataclass
class PerformanceMetrics:
"""性能指标"""
agent_name: str
execution_time: float
memory_usage: Optional[float] = None
token_usage: Optional[int] = None
tool_calls: int = 0
error_count: int = 0
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self):
self.metrics_history = []
self.optimization_strategies = {}
self.cache = {}
def measure_performance(self, func_name: str = None):
"""性能测量装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
# 记录性能指标
metrics = PerformanceMetrics(
agent_name=func_name or func.__name__,
execution_time=execution_time,
memory_usage=self._get_memory_usage(),
tool_calls=getattr(result, 'tool_calls', 0) if hasattr(result, 'tool_calls') else 0
)
self.metrics_history.append(metrics)
return result
except Exception as e:
execution_time = time.time() - start_time
# 记录错误指标
metrics = PerformanceMetrics(
agent_name=func_name or func.__name__,
execution_time=execution_time,
error_count=1
)
self.metrics_history.append(metrics)
raise e
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# 记录性能指标
metrics = PerformanceMetrics(
agent_name=func_name or func.__name__,
execution_time=execution_time,
memory_usage=self._get_memory_usage(),
tool_calls=getattr(result, 'tool_calls', 0) if hasattr(result, 'tool_calls') else 0
)
self.metrics_history.append(metrics)
return result
except Exception as e:
execution_time = time.time() - start_time
# 记录错误指标
metrics = PerformanceMetrics(
agent_name=func_name or func.__name__,
execution_time=execution_time,
error_count=1
)
self.metrics_history.append(metrics)
raise e
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def _get_memory_usage(self) -> Optional[float]:
"""获取内存使用量"""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB
except ImportError:
return None
def enable_caching(self, cache_key_func=None, ttl: int = 3600):
"""启用缓存"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# 生成缓存键
if cache_key_func:
cache_key = cache_key_func(*args, **kwargs)
else:
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < ttl:
return cached_result
# 执行函数
result = await func(*args, **kwargs)
# 缓存结果
self.cache[cache_key] = (result, time.time())
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# 生成缓存键
if cache_key_func:
cache_key = cache_key_func(*args, **kwargs)
else:
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < ttl:
return cached_result
# 执行函数
result = func(*args, **kwargs)
# 缓存结果
self.cache[cache_key] = (result, time.time())
return result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
if not self.metrics_history:
return {"message": "暂无性能数据"}
# 按智能体分组
agent_metrics = {}
for metrics in self.metrics_history:
if metrics.agent_name not in agent_metrics:
agent_metrics[metrics.agent_name] = []
agent_metrics[metrics.agent_name].append(metrics)
# 计算统计信息
report = {
'total_executions': len(self.metrics_history),
'agent_statistics': {},
'overall_performance': {}
}
all_execution_times = []
all_error_counts = 0
for agent_name, metrics_list in agent_metrics.items():
execution_times = [m.execution_time for m in metrics_list]
error_counts = sum(m.error_count for m in metrics_list)
agent_stats = {
'total_executions': len(metrics_list),
'average_execution_time': sum(execution_times) / len(execution_times),
'min_execution_time': min(execution_times),
'max_execution_time': max(execution_times),
'error_rate': error_counts / len(metrics_list) if metrics_list else 0,
'total_errors': error_counts
}
report['agent_statistics'][agent_name] = agent_stats
all_execution_times.extend(execution_times)
all_error_counts += error_counts
# 总体统计
if all_execution_times:
report['overall_performance'] = {
'average_execution_time': sum(all_execution_times) / len(all_execution_times),
'total_errors': all_error_counts,
'overall_error_rate': all_error_counts / len(self.metrics_history) if self.metrics_history else 0
}
return report
def clear_cache(self):
"""清除缓存"""
self.cache.clear()
def clear_metrics(self):
"""清除性能指标"""
self.metrics_history.clear()
全局性能优化器实例
performance_optimizer = PerformanceOptimizer()
智能体性能优化装饰器
def optimize_performance(agent_name: str = None):
"""智能体性能优化装饰器"""
return performance_optimizer.measure_performance(agent_name)
def enable_agent_caching(cache_key_func=None, ttl: int = 3600):
"""智能体缓存装饰器"""
return performance_optimizer.enable_caching(cache_key_func, ttl)
5.5 智能体迁移示例
#### 5.5.1 基本面分析师智能体迁移
from typing import Dict, Any, List, Optional
from agno import Agent
from agno.models.openai import OpenAIChat
from tradingagents.tools.fundamentals import get_fundamentals_data
from tradingagents.tools.company_info import get_company_info
from tradingagents.agents.base import TradingAgent
import logging
logger = logging.getLogger(__name__)
class AgnoFundamentalsAnalyst(TradingAgent):
"""Agno框架下的基本面分析师智能体"""
def __init__(self,
model_id: str = "gpt-4o-mini",
temperature: float = 0.1,
max_tokens: int = 4000):
"""初始化基本面分析师"""
# 创建Agno Agent
self.agent = Agent(
model=OpenAIChat(
id=model_id,
temperature=temperature,
max_tokens=max_tokens
),
tools=[get_fundamentals_data, get_company_info],
description="基本面分析师,负责分析公司财务状况和基本面数据",
instructions=self._get_system_prompt(),
show_tool_calls=True,
debug_mode=False,
monitoring=True
)
super().__init__(name="fundamentals_analyst", model_id=model_id)
def _get_system_prompt(self) -> str:
"""获取系统提示"""
return """# 基本面分析师
您是一个专业的基本面分析师,专注于公司财务分析和基本面研究。
## 职责
1. 分析公司财务报表和关键财务指标
2. 评估公司盈利能力、偿债能力和运营效率
3. 研究行业地位和竞争优势
4. 识别潜在的投资机会和风险
## 分析框架
- 财务健康度:营收增长、利润率、现金流
- 估值水平:PE、PB、EV/EBITDA等估值倍数
- 成长性:营收和利润的历史增长及预期
- 风险因素:财务杠杆、行业风险、监管风险
## 输出格式
请以结构化方式提供分析结果,包括:
- 公司基本信息
- 财务指标分析
- 估值分析
- 投资建议
- 风险提示
## 工具使用
请合理使用提供的工具来获取最新的财务数据和市场信息。"""
@optimize_performance("fundamentals_analyst")
@enable_agent_caching(lambda self, stock_symbol, market, **kwargs: f"fundamentals:{stock_symbol}:{market}", ttl=1800)
async def analyze(self, stock_symbol: str, market: str, **kwargs) -> Dict[str, Any]:
"""执行基本面分析"""
try:
# 获取公司信息
company_name = await self._get_company_name(stock_symbol, market)
# 构建分析任务
analysis_task = f"""
请对以下公司进行全面的基本面分析:
股票代码:{stock_symbol}
市场:{market}
公司名称:{company_name}
请提供详细的财务分析和投资建议。
"""
# 执行分析
result = await self.agent.arun(analysis_task)
# 解析结果
analysis_result = {
'stock_symbol': stock_symbol,
'market': market,
'company_name': company_name,
'analysis': result.content if hasattr(result, 'content') else str(result),
'confidence_score': self._extract_confidence_score(result),
'key_metrics': self._extract_key_metrics(result),
'recommendation': self._extract_recommendation(result),
'risk_factors': self._extract_risk_factors(result),
'timestamp': self._get_timestamp()
}
# 记录分析结果
await self._log_analysis(analysis_result)
return analysis_result
except Exception as e:
logger.error(f"基本面分析失败: {stock_symbol} - {str(e)}")
raise self._create_error_result("fundamentals_analysis", str(e))
async def _get_company_name(self, stock_symbol: str, market: str) -> str:
"""获取公司名称"""
try:
# 使用工具获取公司信息
company_info = await get_company_info(stock_symbol, market)
return company_info.get('name', stock_symbol)
except Exception as e:
logger.warning(f"获取公司名称失败: {str(e)}")
return stock_symbol # 降级处理
def _extract_confidence_score(self, result: Any) -> float:
"""提取置信度分数"""
# 从结果中提取置信度,这里需要根据实际结果格式调整
content = result.content if hasattr(result, 'content') else str(result)
# 简单的关键词匹配
if "非常有信心" in content or "强烈推荐" in content:
return 0.9
elif "有信心" in content or "推荐" in content:
return 0.7
elif "中性" in content or "观望" in content:
return 0.5
elif "谨慎" in content or "风险" in content:
return 0.3
else:
return 0.6 # 默认置信度
def _extract_key_metrics(self, result: Any) -> Dict[str, Any]:
"""提取关键指标"""
# 这里需要根据实际结果格式解析关键指标
content = result.content if hasattr(result, 'content') else str(result)
metrics = {}
# 简单的文本解析示例
if "PE" in content:
metrics['pe_ratio'] = self._extract_numeric_value(content, "PE")
if "PB" in content:
metrics['pb_ratio'] = self._extract_numeric_value(content, "PB")
if "ROE" in content:
metrics['roe'] = self._extract_numeric_value(content, "ROE")
return metrics
def _extract_numeric_value(self, text: str, metric_name: str) -> Optional[float]:
"""从文本中提取数值"""
import re
# 简单的数值提取正则表达式
pattern = rf"{metric_name}.*?([0-9]+\.?[0-9]*)"
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
return float(match.group(1))
except ValueError:
pass
return None
def _extract_recommendation(self, result: Any) -> str:
"""提取投资建议"""
content = result.content if hasattr(result, 'content') else str(result)
# 简单的投资建议提取
if "买入" in content or "推荐买入" in content:
return "买入"
elif "持有" in content or "中性" in content:
return "持有"
elif "卖出" in content or "减持" in content:
return "卖出"
else:
return "观望"
def _extract_risk_factors(self, result: Any) -> List[str]:
"""提取风险因素"""
content = result.content if hasattr(result, 'content') else str(result)
risk_factors = []
# 简单的风险关键词匹配
risk_keywords = ["风险", "不确定性", "波动", "下跌", "亏损", "压力", "挑战"]
for keyword in risk_keywords:
if keyword in content:
# 提取包含关键词的句子
sentences = content.split('。')
for sentence in sentences:
if keyword in sentence:
risk_factors.append(sentence.strip())
return risk_factors[:5] # 限制数量
def _get_timestamp(self) -> str:
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
async def _log_analysis(self, analysis_result: Dict[str, Any]):
"""记录分析结果"""
logger.info(f"基本面分析完成: {analysis_result['stock_symbol']}")
logger.debug(f"分析结果: {analysis_result}")
def _create_error_result(self, analysis_type: str, error_message: str) -> Dict[str, Any]:
"""创建错误结果"""
return {
'error': True,
'error_type': analysis_type,
'error_message': error_message,
'confidence_score': 0.0,
'recommendation': '分析失败',
'timestamp': self._get_timestamp()
}
# 迁移适配器
class FundamentalsAnalystAdapter:
"""基本面分析师迁移适配器"""
def __init__(self, agno_analyst: AgnoFundamentalsAnalyst):
self.agno_analyst = agno_analyst
async def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""兼容LangGraph的调用接口"""
# 提取参数
stock_symbol = state.get('stock_symbol', '')
market = state.get('market', 'us')
if not stock_symbol:
return {
'fundamentals_analysis': '缺少股票代码',
'fundamentals_confidence': 0.0,
'fundamentals_recommendation': '无法分析'
}
# 执行Agno分析
result = await self.agno_analyst.analyze(stock_symbol, market)
# 转换回LangGraph格式
return convert_back_to_langgraph_format(result, "fundamentals")
# 工厂函数
def create_agno_fundamentals_analyst(**kwargs) -> FundamentalsAnalystAdapter:
"""创建Agno基本面分析师"""
agno_analyst = AgnoFundamentalsAnalyst(**kwargs)
return FundamentalsAnalystAdapter(agno_analyst)
#### 5.5.2 市场分析师智能体迁移
from typing import Dict, Any, List, Optional
from agno import Agent
from agno.models.openai import OpenAIChat
from tradingagents.tools.market_data import get_market_data, get_technical_indicators
from tradingagents.tools.news import get_market_news
from tradingagents.agents.base import TradingAgent
import logging
logger = logging.getLogger(__name__)
class AgnoMarketAnalyst(TradingAgent):
"""Agno框架下的市场分析师智能体"""
def __init__(self,
model_id: str = "gpt-4o-mini",
temperature: float = 0.1,
max_tokens: int = 4000):
"""初始化市场分析师"""
# 创建Agno Agent
self.agent = Agent(
model=OpenAIChat(
id=model_id,
temperature=temperature,
max_tokens=max_tokens
),
tools=[get_market_data, get_technical_indicators, get_market_news],
description="市场分析师,负责分析市场趋势和技术指标",
instructions=self._get_system_prompt(),
show_tool_calls=True,
debug_mode=False,
monitoring=True
)
super().__init__(name="market_analyst", model_id=model_id)
def _get_system_prompt(self) -> str:
"""获取系统提示"""
return """# 市场分析师
您是一个专业的市场分析师,专注于市场趋势分析和技术分析。
## 职责
1. 分析市场价格走势和交易量变化
2. 评估技术指标和图表形态
3. 研究市场情绪和资金流向
4. 识别市场机会和风险信号
## 分析框架
- 趋势分析:价格趋势、支撑阻力位
- 技术指标:移动平均线、RSI、MACD等
- 成交量分析:量价关系、资金流向
- 市场情绪:投资者情绪、新闻影响
## 输出格式
请以结构化方式提供分析结果,包括:
- 市场概况
- 技术分析
- 趋势判断
- 关键点位
- 交易建议
- 风险提示
## 工具使用
请合理使用提供的工具来获取最新的市场数据和技术指标。"""
@optimize_performance("market_analyst")
@enable_agent_caching(lambda self, stock_symbol, market, **kwargs: f"market:{stock_symbol}:{market}", ttl=900)
async def analyze(self, stock_symbol: str, market: str, **kwargs) -> Dict[str, Any]:
"""执行市场分析"""
try:
# 获取市场数据
market_data = await self._get_market_data(stock_symbol, market)
# 获取技术指标
technical_indicators = await self._get_technical_indicators(stock_symbol, market)
# 获取相关新闻
market_news = await self._get_market_news(stock_symbol, market)
# 构建分析任务
analysis_task = f"""
请对以下股票进行全面的市场分析:
股票代码:{stock_symbol}
市场:{market}
市场数据:
{market_data}
技术指标:
{technical_indicators}
相关新闻:
{market_news}
请提供详细的市场分析和交易建议。
"""
# 执行分析
result = await self.agent.arun(analysis_task)
# 解析结果
analysis_result = {
'stock_symbol': stock_symbol,
'market': market,
'analysis': result.content if hasattr(result, 'content') else str(result),
'market_data': market_data,
'technical_indicators': technical_indicators,
'market_news': market_news,
'confidence_score': self._extract_confidence_score(result),
'trend_direction': self._extract_trend_direction(result),
'key_levels': self._extract_key_levels(result),
'recommendation': self._extract_recommendation(result),
'timestamp': self._get_timestamp()
}
# 记录分析结果
await self._log_analysis(analysis_result)
return analysis_result
except Exception as e:
logger.error(f"市场分析失败: {stock_symbol} - {str(e)}")
raise self._create_error_result("market_analysis", str(e))
async def _get_market_data(self, stock_symbol: str, market: str) -> Dict[str, Any]:
"""获取市场数据"""
try:
return await get_market_data(stock_symbol, market)
except Exception as e:
logger.warning(f"获取市场数据失败: {str(e)}")
return {}
async def _get_technical_indicators(self, stock_symbol: str, market: str) -> Dict[str, Any]:
"""获取技术指标"""
try:
return await get_technical_indicators(stock_symbol, market)
except Exception as e:
logger.warning(f"获取技术指标失败: {str(e)}")
return {}
async def _get_market_news(self, stock_symbol: str, market: str) -> List[Dict[str, Any]]:
"""获取市场新闻"""
try:
return await get_market_news(stock_symbol, market)
except Exception as e:
logger.warning(f"获取市场新闻失败: {str(e)}")
return []
def _extract_trend_direction(self, result: Any) -> str:
"""提取趋势方向"""
content = result.content if hasattr(result, 'content') else str(result)
# 简单的趋势判断
if "上涨" in content or "上升趋势" in content or "看涨" in content:
return "上涨"
elif "下跌" in content or "下降趋势" in content or "看跌" in content:
return "下跌"
elif "震荡" in content or "横盘" in content:
return "震荡"
else:
return "中性"
def _extract_key_levels(self, result: Any) -> Dict[str, Any]:
"""提取关键价位"""
content = result.content if hasattr(result, 'content') else str(result)
key_levels = {}
# 简单的关键价位提取
import re
# 提取支撑位
support_pattern = r"支撑.*?([0-9]+\.?[0-9]*)"
support_match = re.search(support_pattern, content)
if support_match:
try:
key_levels['support'] = float(support_match.group(1))
except ValueError:
pass
# 提取阻力位
resistance_pattern = r"阻力.*?([0-9]+\.?[0-9]*)"
resistance_match = re.search(resistance_pattern, content)
if resistance_match:
try:
key_levels['resistance'] = float(resistance_match.group(1))
except ValueError:
pass
return key_levels
def _get_timestamp(self) -> str:
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
async def _log_analysis(self, analysis_result: Dict[str, Any]):
"""记录分析结果"""
logger.info(f"市场分析完成: {analysis_result['stock_symbol']}")
logger.debug(f"趋势方向: {analysis_result.get('trend_direction', '未知')}")
# 迁移适配器
class MarketAnalystAdapter:
"""市场分析师迁移适配器"""
def __init__(self, agno_analyst: AgnoMarketAnalyst):
self.agno_analyst = agno_analyst
async def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""兼容LangGraph的调用接口"""
# 提取参数
stock_symbol = state.get('stock_symbol', '')
market = state.get('market', 'us')
if not stock_symbol:
return {
'market_analysis': '缺少股票代码',
'market_confidence': 0.0,
'market_recommendation': '无法分析',
'trend_direction': '未知'
}
# 执行Agno分析
result = await self.agno_analyst.analyze(stock_symbol, market)
# 转换回LangGraph格式
return convert_back_to_langgraph_format(result, "market")
# 工厂函数
def create_agno_market_analyst(**kwargs) -> MarketAnalystAdapter:
"""创建Agno市场分析师"""
agno_analyst = AgnoMarketAnalyst(**kwargs)
return MarketAnalystAdapter(agno_analyst)