## 项目概述
将 Elixir 版本的 Symphony 项目 1:1 移植到 Python 3.12,使用 AgentScope 作为 Agent 基础库。
### 原始项目
- **源项目**: `./symphony/` (Elixir)
- **目标项目**: `./symphony.py/` (Python 3.12)
- **Agent 框架**: [AgentScope](https://github.com/agentscope-ai/agentscope)
---
## 1. 架构分析与映射
### 1.1 Elixir 原架构模块
| 模块 | 文件路径 | 职责 | Python 对应实现 |
|------|----------|------|-----------------|
| `Orchestrator` | `lib/symphony_elixir/orchestrator.ex` | 核心调度器,轮询和分派任务 | `symphony.orchestrator.Orchestrator` (asyncio) |
| `AgentRunner` | `lib/symphony_elixir/agent_runner.ex` | 执行单个 Issue 的 Agent | `symphony.agent_runner.AgentRunner` |
| `Workflow` | `lib/symphony_elixir/workflow.ex` | 加载 WORKFLOW.md | `symphony.workflow.WorkflowLoader` |
| `Config` | `lib/symphony_elixir/config.ex` | 配置管理 | `symphony.config.Config` + Pydantic |
| `Config.Schema` | `lib/symphony_elixir/config/schema.ex` | 配置 Schema 验证 | `symphony.config.schema` (Pydantic models) |
| `Linear.Client` | `lib/symphony_elixir/linear/client.ex` | Linear GraphQL API 客户端 | `symphony.trackers.linear_client.LinearClient` |
| `Linear.Issue` | `lib/symphony_elixir/linear/issue.ex` | Issue 数据模型 | `symphony.models.issue.Issue` (Pydantic) |
| `Workspace` | `lib/symphony_elixir/workspace.ex` | 工作空间管理 | `symphony.workspace.WorkspaceManager` |
| `PromptBuilder` | `lib/symphony_elixir/prompt_builder.ex` | 提示词构建 | `symphony.prompts.PromptBuilder` |
| `Codex.AppServer` | `lib/symphony_elixir/codex/app_server.ex` | Codex AppServer 协议 | `symphony.agents.agent_scope_client.AgentScopeClient` |
| `Codex.DynamicTool` | `lib/symphony_elixir/codex/dynamic_tool.ex` | 动态工具执行 | `symphony.tools.linear_graphql.LinearGraphQLTool` |
| `Tracker` | `lib/symphony_elixir/tracker.ex` | Tracker 适配器接口 | `symphony.trackers.base.BaseTracker` |
| `StatusDashboard` | `lib/symphony_elixir/status_dashboard.ex` | 状态仪表板 | `symphony.dashboard.Dashboard` |
| `HttpServer` | `lib/symphony_elixir_web/` | Web 服务器和 API | `symphony.web.server.WebServer` (FastAPI) |
| `PathSafety` | `lib/symphony_elixir/path_safety.ex` | 路径安全验证 | `symphony.utils.path_utils.PathSafety` |
| `SSH` | `lib/symphony_elixir/ssh.ex` | SSH 远程执行 | `symphony.utils.ssh_client.SSHClient` |
### 1.2 核心数据流
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Symphony Python 架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────────────────────────────────────────┐ │
│ │ Workflow │───▶│ Orchestrator │ │
│ │ Loader │ │ (asyncio 事件循环, 状态机, 调度算法) │ │
│ └──────────────┘ └──────────────────┬───────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Linear │ │ Agent │ │ Workspace │ │
│ │ Client │ │ Runner │ │ Manager │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────┐ │
│ │ AgentScope ReActAgent │ │
│ │ + Custom Tools │ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Observability Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │ │
│ │ │ Logging │ │ Dashboard │ │ HTTP API (FastAPI) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## 2. 项目结构设计
```
symphony.py/
├── pyproject.toml # 项目配置和依赖
├── README.md # 项目说明
├── WORKFLOW.md # 示例工作流配置
├── src/
│ └── symphony/
│ ├── __init__.py
│ ├── __main__.py # 入口: python -m symphony
│ ├── cli.py # 命令行接口
│ │
│ ├── config/ # 配置模块
│ │ ├── __init__.py
│ │ ├── config.py # 主配置类
│ │ ├── schema.py # Pydantic 配置模型
│ │ └── defaults.py # 默认配置值
│ │
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── issue.py # Issue 数据模型
│ │ ├── session.py # Session 状态模型
│ │ └── events.py # 事件模型
│ │
│ ├── workflow/ # 工作流管理
│ │ ├── __init__.py
│ │ ├── loader.py # WORKFLOW.md 加载解析
│ │ ├── templates.py # 模板渲染
│ │ └── watcher.py # 文件变更监控
│ │
│ ├── trackers/ # Issue Tracker 适配器
│ │ ├── __init__.py
│ │ ├── base.py # Tracker 抽象基类
│ │ ├── linear.py # Linear Tracker 实现
│ │ └── memory.py # 内存 Tracker (测试用)
│ │
│ ├── workspace/ # 工作空间管理
│ │ ├── __init__.py
│ │ ├── manager.py # 工作空间生命周期
│ │ ├── hooks.py # Hook 执行
│ │ └── safety.py # 路径安全验证
│ │
│ ├── agents/ # Agent 相关
│ │ ├── __init__.py
│ │ ├── base.py # Agent 基类
│ │ ├── react_agent.py # ReAct Agent 实现
│ │ ├── client.py # AgentScope 客户端
│ │ └── tools/ # 工具函数
│ │ ├── __init__.py
│ │ ├── linear_graphql.py
│ │ └── git_tools.py
│ │
│ ├── orchestrator/ # 调度器核心
│ │ ├── __init__.py
│ │ ├── orchestrator.py # 主调度器
│ │ ├── state.py # 状态管理
│ │ ├── dispatch.py # 分派逻辑
│ │ ├── retry.py # 重试机制
│ │ └── reconciliation.py # 状态调和
│ │
│ ├── dashboard/ # 状态仪表板
│ │ ├── __init__.py
│ │ ├── dashboard.py # 终端仪表板
│ │ ├── formatter.py # 格式化输出
│ │ └── snapshot.py # 状态快照
│ │
│ ├── web/ # HTTP API (可选)
│ │ ├── __init__.py
│ │ ├── server.py # FastAPI 服务器
│ │ ├── api.py # API 路由
│ │ └── static/ # 静态文件
│ │
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── logging.py # 日志配置
│ ├── paths.py # 路径处理
│ ├── ssh.py # SSH 客户端
│ └── async_utils.py # 异步工具
│
├── tests/ # 测试目录
│ ├── __init__.py
│ ├── conftest.py
│ ├── unit/
│ │ ├── test_config.py
│ │ ├── test_workflow.py
│ │ ├── test_orchestrator.py
│ │ └── test_workspace.py
│ ├── integration/
│ │ └── test_linear_client.py
│ └── fixtures/
│ └── WORKFLOW.md
│
└── docs/ # 中文文档
├── 架构说明.md
├── 配置指南.md
├── API文档.md
└── 部署指南.md
```
---
## 3. 技术栈选择
### 3.1 核心依赖
| 功能 | 库 | 版本 |
|------|-----|------|
| Python | python | ^3.12 |
| Agent 框架 | agentscope | ^1.0 |
| HTTP 客户端 | httpx | ^0.27 |
| Web 框架 | fastapi | ^0.115 |
| 数据验证 | pydantic | ^2.9 |
| 配置管理 | pydantic-settings | ^2.5 |
| 模板引擎 | jinja2 | ^3.1 |
| 异步任务 | asyncio | 内置 |
| 调度 | apscheduler | ^3.10 |
| 终端 UI | rich | ^13.9 |
| 日志 | structlog | ^24.4 |
| YAML 解析 | pyyaml | ^6.0 |
| SSH 客户端 | asyncssh | ^2.17 |
| 文件监控 | watchfiles | ^0.24 |
| 测试 | pytest-asyncio | ^0.24 |
### 3.2 AgentScope 集成策略
由于 Symphony 需要精细控制 Agent 的生命周期和与 Codex app-server 的交互,我们将采用以下策略:
1. **使用 AgentScope 的底层组件**:Msg, ModelWrapper, Tool 等
2. **自定义 Agent 类**:继承 AgentBase 实现 SymphonyAgent
3. **工具集成**:将 linear_graphql 作为 AgentScope Tool 注册
4. **不直接使用 ReActAgent**:因为 Symphony 需要自己控制多轮对话和状态
---
## 4. 核心模块详细设计
### 4.1 配置系统 (config/)
```python
# schema.py - Pydantic 模型
class TrackerConfig(BaseModel):
kind: Literal["linear", "memory"] = "linear"
endpoint: str = "https://api.linear.app/graphql"
api_key: str | None = None
project_slug: str | None = None
assignee: str | None = None
active_states: list[str] = ["Todo", "In Progress"]
terminal_states: list[str] = ["Closed", "Cancelled", "Canceled", "Duplicate", "Done"]
class PollingConfig(BaseModel):
interval_ms: int = Field(default=30000, ge=1000)
class WorkspaceConfig(BaseModel):
root: str = Field(default_factory=lambda: str(Path(tempfile.gettempdir()) / "symphony_workspaces"))
class HooksConfig(BaseModel):
after_create: str | None = None
before_run: str | None = None
after_run: str | None = None
before_remove: str | None = None
timeout_ms: int = Field(default=60000, ge=1000)
class AgentConfig(BaseModel):
max_concurrent_agents: int = Field(default=10, ge=1)
max_turns: int = Field(default=20, ge=1)
max_retry_backoff_ms: int = Field(default=300000, ge=1000)
max_concurrent_agents_by_state: dict[str, int] = Field(default_factory=dict)
class CodexConfig(BaseModel):
command: str = "codex app-server"
approval_policy: str | dict = "never"
thread_sandbox: str = "workspace-write"
turn_sandbox_policy: dict | None = None
turn_timeout_ms: int = Field(default=3600000, ge=1000)
read_timeout_ms: int = Field(default=5000, ge=1000)
stall_timeout_ms: int = Field(default=300000, ge=0)
class ServerConfig(BaseModel):
port: int | None = Field(default=None, ge=0)
host: str = "127.0.0.1"
class SymphonyConfig(BaseModel):
tracker: TrackerConfig = Field(default_factory=TrackerConfig)
polling: PollingConfig = Field(default_factory=PollingConfig)
workspace: WorkspaceConfig = Field(default_factory=WorkspaceConfig)
hooks: HooksConfig = Field(default_factory=HooksConfig)
agent: AgentConfig = Field(default_factory=AgentConfig)
codex: CodexConfig = Field(default_factory=CodexConfig)
server: ServerConfig = Field(default_factory=ServerConfig)
```
### 4.2 Issue 模型 (models/issue.py)
```python
class Issue(BaseModel):
"""Linear Issue 标准化模型"""
id: str
identifier: str
title: str
description: str | None = None
priority: int | None = None
state: str
branch_name: str | None = None
url: str | None = None
assignee_id: str | None = None
labels: list[str] = Field(default_factory=list)
blocked_by: list[BlockerRef] = Field(default_factory=list)
assigned_to_worker: bool = True
created_at: datetime | None = None
updated_at: datetime | None = None
class BlockerRef(BaseModel):
id: str | None = None
identifier: str | None = None
state: str | None = None
```
### 4.3 Orchestrator 状态机 (orchestrator/state.py)
```python
@dataclass
class RunningEntry:
"""运行中的 Issue 条目"""
task: asyncio.Task
issue: Issue
worker_host: str | None = None
workspace_path: str | None = None
session_id: str | None = None
codex_app_server_pid: str | None = None
last_codex_message: str | None = None
last_codex_timestamp: datetime | None = None
last_codex_event: str | None = None
codex_input_tokens: int = 0
codex_output_tokens: int = 0
codex_total_tokens: int = 0
turn_count: int = 0
retry_attempt: int = 0
started_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class RetryEntry:
"""重试队列条目"""
issue_id: str
identifier: str
attempt: int
due_at: datetime
error: str | None = None
worker_host: str | None = None
workspace_path: str | None = None
timer_handle: asyncio.TimerHandle | None = None
@dataclass
class OrchestratorState:
"""Orchestrator 运行时状态"""
poll_interval_ms: int = 30000
max_concurrent_agents: int = 10
running: dict[str, RunningEntry] = field(default_factory=dict)
claimed: set[str] = field(default_factory=set)
retry_attempts: dict[str, RetryEntry] = field(default_factory=dict)
completed: set[str] = field(default_factory=set)
codex_totals: CodexTotals = field(default_factory=CodexTotals)
codex_rate_limits: dict | None = None
```
---
## 5. 实现阶段计划
### 阶段 1: 项目骨架和基础配置
**目标**: 建立项目结构,实现配置系统
**任务清单**:
- 创建项目目录结构
- 编写 pyproject.toml 和依赖
- 实现 config/schema.py - Pydantic 配置模型
- 实现 config/config.py - 配置加载和验证
- 实现 workflow/loader.py - WORKFLOW.md 解析
- 编写配置系统单元测试
### 阶段 2: Issue Tracker 和模型
**目标**: 实现 Linear API 客户端和 Issue 模型
**任务清单**:
- 实现 models/issue.py - Issue 数据模型
- 实现 trackers/base.py - Tracker 抽象基类
- 实现 trackers/linear.py - Linear GraphQL 客户端
- 实现 trackers/memory.py - 内存 Tracker (测试用)
- 实现 Issue 查询的分页和归一化
- 编写 Linear Client 测试
### 阶段 3: Workspace 管理
**目标**: 实现工作空间生命周期管理
**任务清单**:
- 实现 workspace/safety.py - 路径安全验证
- 实现 workspace/manager.py - 工作空间管理
- 实现 workspace/hooks.py - Hook 执行
- 支持本地工作空间
- 支持远程 SSH 工作空间 (可选)
- 编写 Workspace 测试
### 阶段 4: Agent 和 Prompt
**目标**: 集成 AgentScope,实现 Agent 执行
**任务清单**:
- 实现 prompts/builder.py - Prompt 构建
- 实现 agents/tools/linear_graphql.py - Linear GraphQL 工具
- 实现 agents/react_agent.py - ReAct Agent
- 实现 agents/client.py - AgentScope 客户端封装
- 集成 AgentScope 的模型和消息系统
- 编写 Agent 测试
### 阶段 5: Orchestrator 核心
**目标**: 实现核心调度器逻辑
**任务清单**:
- 实现 orchestrator/state.py - 状态管理
- 实现 orchestrator/dispatch.py - 分派逻辑
- 实现 orchestrator/retry.py - 重试机制
- 实现 orchestrator/reconciliation.py - 状态调和
- 实现 orchestrator/orchestrator.py - 主调度器
- 实现轮询循环和任务管理
- 编写 Orchestrator 测试
### 阶段 6: Dashboard 和日志
**目标**: 实现状态仪表板和结构化日志
**任务清单**:
- 实现 utils/logging.py - 结构化日志
- 实现 dashboard/snapshot.py - 状态快照
- 实现 dashboard/formatter.py - 格式化
- 实现 dashboard/dashboard.py - 终端仪表板
- 集成 rich 库渲染
- 实现实时刷新
### 阶段 7: Web API (可选)
**目标**: 实现 HTTP API 服务器
**任务清单**:
- 实现 web/server.py - FastAPI 服务器
- 实现 web/api.py - API 路由
- GET /api/v1/state - 状态查询
- GET /api/v1/<issue_id> - Issue 详情
- POST /api/v1/refresh - 触发轮询
- 静态文件服务 (可选)
### 阶段 8: CLI 和集成
**目标**: 实现命令行接口和集成测试
**任务清单**:
- 实现 cli.py - 命令行参数解析
- 实现 __main__.py - 入口点
- 实现启动流程
- 配置文件热重载
- 优雅退出处理
- 编写集成测试
### 阶段 9: 测试和文档
**目标**: 完善测试和文档
**任务清单**:
- 编写单元测试 (覆盖率 > 80%)
- 编写集成测试
- 编写 README.md
- 编写架构说明.md
- 编写配置指南.md
- 编写 API文档.md
- 代码审查和优化
---
## 6. 关键技术决策
### 6.1 异步架构
使用 Python 的 asyncio 替代 Elixir 的 OTP:
```python
# Elixir GenServer 对应 Python asyncio
# handle_info -> asyncio.Queue / asyncio.Event
# Process.monitor -> asyncio.Task 异常处理
# Process.send_after -> asyncio.call_later
class Orchestrator:
def __init__(self):
self._message_queue = asyncio.Queue()
self._retry_timers: dict[str, asyncio.TimerHandle] = {}
self._running_tasks: dict[str, asyncio.Task] = {}
```
### 6.2 状态管理
使用 dataclass + 异步锁管理状态:
```python
@dataclass
class OrchestratorState:
running: dict[str, RunningEntry]
retry_attempts: dict[str, RetryEntry]
class Orchestrator:
def __init__(self):
self._state = OrchestratorState()
self._state_lock = asyncio.Lock()
```
### 6.3 AgentScope 集成策略
不直接使用 ReActAgent,而是使用底层组件:
```python
from agentscope.models import OpenAIChatWrapper
from agentscope.message import Msg
from agentscope.tool import Toolkit
class SymphonyAgent:
def __init__(self, model_config, tools):
self.model = OpenAIChatWrapper(**model_config)
self.toolkit = Toolkit()
for tool in tools:
self.toolkit.register_tool_function(tool)
```
---
## 7. 验收标准
### 功能完整性
- 可以加载和解析 WORKFLOW.md
- 可以连接 Linear API 查询 Issues
- 可以创建工作空间并执行 Hooks
- 可以分派 Agent 执行任务
- 支持多轮对话
- 支持重试和退避
- 状态调和正确
- 仪表板实时显示
### 性能要求
- 轮询间隔可配置 (默认 30s)
- 并发 Agent 数量可配置 (默认 10)
- 内存使用稳定,无泄漏
### 代码质量
- 类型注解覆盖率 > 90%
- 单元测试覆盖率 > 80%
- 符合 PEP8 规范
- 文档字符串完整
---
## 8. 风险评估
| 风险 | 影响 | 缓解措施 |
|------|------|----------|
| AgentScope API 变化 | 高 | 锁定版本,关注更新 |
| Linear API 变化 | 中 | 隔离 API 调用层 |
| Python asyncio 复杂度 | 中 | 详细测试,代码审查 |
| SSH 远程执行安全 | 高 | 严格输入验证 |
---
## 9. 附录
### 9.1 参考文档
- [AgentScope 文档](https://doc.agentscope.io/)
- [Symphony SPEC.md](../symphony/SPEC.md)
- [Elixir 实现源码](../symphony/elixir/)
### 9.2 术语对照
| Elixir | Python |
|--------|--------|
| GenServer | asyncio + dataclass |
| GenServer.call/cast | asyncio.Queue |
| Process.monitor | asyncio.Task.add_done_callback |
| Agent | AgentRunner + SymphonyAgent |
| GenServer.handle_info | 消息处理协程 |
---
*文档版本: 1.0*
*最后更新: 2026-03-15*
登录后可参与表态
讨论回复
1 条回复
✨步子哥 (steper)
#1
03-15 02:49
登录后可参与表态