Loading...
正在加载...
请稍候

Symphony Python 移植开发计划

✨步子哥 (steper) 2026年03月15日 02:41
## 项目概述 将 Elixir 版本的 Symphony 项目 1:1 移植到 Python 3.12,使用 AgentScope 作为 Agent 基础库。 ### 原始项目 - **源项目**: `./symphony/` (Elixir) - **目标项目**: `./symphony.py/` (Python 3.12) - **Agent 框架**: [AgentScope](https://github.com/agentscope-ai/agentscope) --- ## 1. 架构分析与映射 ### 1.1 Elixir 原架构模块 | 模块 | 文件路径 | 职责 | Python 对应实现 | |------|----------|------|-----------------| | `Orchestrator` | `lib/symphony_elixir/orchestrator.ex` | 核心调度器,轮询和分派任务 | `symphony.orchestrator.Orchestrator` (asyncio) | | `AgentRunner` | `lib/symphony_elixir/agent_runner.ex` | 执行单个 Issue 的 Agent | `symphony.agent_runner.AgentRunner` | | `Workflow` | `lib/symphony_elixir/workflow.ex` | 加载 WORKFLOW.md | `symphony.workflow.WorkflowLoader` | | `Config` | `lib/symphony_elixir/config.ex` | 配置管理 | `symphony.config.Config` + Pydantic | | `Config.Schema` | `lib/symphony_elixir/config/schema.ex` | 配置 Schema 验证 | `symphony.config.schema` (Pydantic models) | | `Linear.Client` | `lib/symphony_elixir/linear/client.ex` | Linear GraphQL API 客户端 | `symphony.trackers.linear_client.LinearClient` | | `Linear.Issue` | `lib/symphony_elixir/linear/issue.ex` | Issue 数据模型 | `symphony.models.issue.Issue` (Pydantic) | | `Workspace` | `lib/symphony_elixir/workspace.ex` | 工作空间管理 | `symphony.workspace.WorkspaceManager` | | `PromptBuilder` | `lib/symphony_elixir/prompt_builder.ex` | 提示词构建 | `symphony.prompts.PromptBuilder` | | `Codex.AppServer` | `lib/symphony_elixir/codex/app_server.ex` | Codex AppServer 协议 | `symphony.agents.agent_scope_client.AgentScopeClient` | | `Codex.DynamicTool` | `lib/symphony_elixir/codex/dynamic_tool.ex` | 动态工具执行 | `symphony.tools.linear_graphql.LinearGraphQLTool` | | `Tracker` | `lib/symphony_elixir/tracker.ex` | Tracker 适配器接口 | `symphony.trackers.base.BaseTracker` | | `StatusDashboard` | `lib/symphony_elixir/status_dashboard.ex` | 状态仪表板 | `symphony.dashboard.Dashboard` | | `HttpServer` | `lib/symphony_elixir_web/` | Web 服务器和 API | `symphony.web.server.WebServer` (FastAPI) | | `PathSafety` | `lib/symphony_elixir/path_safety.ex` | 路径安全验证 | `symphony.utils.path_utils.PathSafety` | | `SSH` | `lib/symphony_elixir/ssh.ex` | SSH 远程执行 | `symphony.utils.ssh_client.SSHClient` | ### 1.2 核心数据流 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Symphony Python 架构 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────────────────────────────────────────┐ │ │ │ Workflow │───▶│ Orchestrator │ │ │ │ Loader │ │ (asyncio 事件循环, 状态机, 调度算法) │ │ │ └──────────────┘ └──────────────────┬───────────────────────────────┘ │ │ │ │ │ ┌────────────────────┼────────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Linear │ │ Agent │ │ Workspace │ │ │ │ Client │ │ Runner │ │ Manager │ │ │ └─────────────┘ └──────┬──────┘ └─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────┐ │ │ │ AgentScope ReActAgent │ │ │ │ + Custom Tools │ │ │ └─────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Observability Layer │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │ │ │ │ │ Logging │ │ Dashboard │ │ HTTP API (FastAPI) │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` --- ## 2. 项目结构设计 ``` symphony.py/ ├── pyproject.toml # 项目配置和依赖 ├── README.md # 项目说明 ├── WORKFLOW.md # 示例工作流配置 ├── src/ │ └── symphony/ │ ├── __init__.py │ ├── __main__.py # 入口: python -m symphony │ ├── cli.py # 命令行接口 │ │ │ ├── config/ # 配置模块 │ │ ├── __init__.py │ │ ├── config.py # 主配置类 │ │ ├── schema.py # Pydantic 配置模型 │ │ └── defaults.py # 默认配置值 │ │ │ ├── models/ # 数据模型 │ │ ├── __init__.py │ │ ├── issue.py # Issue 数据模型 │ │ ├── session.py # Session 状态模型 │ │ └── events.py # 事件模型 │ │ │ ├── workflow/ # 工作流管理 │ │ ├── __init__.py │ │ ├── loader.py # WORKFLOW.md 加载解析 │ │ ├── templates.py # 模板渲染 │ │ └── watcher.py # 文件变更监控 │ │ │ ├── trackers/ # Issue Tracker 适配器 │ │ ├── __init__.py │ │ ├── base.py # Tracker 抽象基类 │ │ ├── linear.py # Linear Tracker 实现 │ │ └── memory.py # 内存 Tracker (测试用) │ │ │ ├── workspace/ # 工作空间管理 │ │ ├── __init__.py │ │ ├── manager.py # 工作空间生命周期 │ │ ├── hooks.py # Hook 执行 │ │ └── safety.py # 路径安全验证 │ │ │ ├── agents/ # Agent 相关 │ │ ├── __init__.py │ │ ├── base.py # Agent 基类 │ │ ├── react_agent.py # ReAct Agent 实现 │ │ ├── client.py # AgentScope 客户端 │ │ └── tools/ # 工具函数 │ │ ├── __init__.py │ │ ├── linear_graphql.py │ │ └── git_tools.py │ │ │ ├── orchestrator/ # 调度器核心 │ │ ├── __init__.py │ │ ├── orchestrator.py # 主调度器 │ │ ├── state.py # 状态管理 │ │ ├── dispatch.py # 分派逻辑 │ │ ├── retry.py # 重试机制 │ │ └── reconciliation.py # 状态调和 │ │ │ ├── dashboard/ # 状态仪表板 │ │ ├── __init__.py │ │ ├── dashboard.py # 终端仪表板 │ │ ├── formatter.py # 格式化输出 │ │ └── snapshot.py # 状态快照 │ │ │ ├── web/ # HTTP API (可选) │ │ ├── __init__.py │ │ ├── server.py # FastAPI 服务器 │ │ ├── api.py # API 路由 │ │ └── static/ # 静态文件 │ │ │ └── utils/ # 工具函数 │ ├── __init__.py │ ├── logging.py # 日志配置 │ ├── paths.py # 路径处理 │ ├── ssh.py # SSH 客户端 │ └── async_utils.py # 异步工具 │ ├── tests/ # 测试目录 │ ├── __init__.py │ ├── conftest.py │ ├── unit/ │ │ ├── test_config.py │ │ ├── test_workflow.py │ │ ├── test_orchestrator.py │ │ └── test_workspace.py │ ├── integration/ │ │ └── test_linear_client.py │ └── fixtures/ │ └── WORKFLOW.md │ └── docs/ # 中文文档 ├── 架构说明.md ├── 配置指南.md ├── API文档.md └── 部署指南.md ``` --- ## 3. 技术栈选择 ### 3.1 核心依赖 | 功能 | 库 | 版本 | |------|-----|------| | Python | python | ^3.12 | | Agent 框架 | agentscope | ^1.0 | | HTTP 客户端 | httpx | ^0.27 | | Web 框架 | fastapi | ^0.115 | | 数据验证 | pydantic | ^2.9 | | 配置管理 | pydantic-settings | ^2.5 | | 模板引擎 | jinja2 | ^3.1 | | 异步任务 | asyncio | 内置 | | 调度 | apscheduler | ^3.10 | | 终端 UI | rich | ^13.9 | | 日志 | structlog | ^24.4 | | YAML 解析 | pyyaml | ^6.0 | | SSH 客户端 | asyncssh | ^2.17 | | 文件监控 | watchfiles | ^0.24 | | 测试 | pytest-asyncio | ^0.24 | ### 3.2 AgentScope 集成策略 由于 Symphony 需要精细控制 Agent 的生命周期和与 Codex app-server 的交互,我们将采用以下策略: 1. **使用 AgentScope 的底层组件**:Msg, ModelWrapper, Tool 等 2. **自定义 Agent 类**:继承 AgentBase 实现 SymphonyAgent 3. **工具集成**:将 linear_graphql 作为 AgentScope Tool 注册 4. **不直接使用 ReActAgent**:因为 Symphony 需要自己控制多轮对话和状态 --- ## 4. 核心模块详细设计 ### 4.1 配置系统 (config/) ```python # schema.py - Pydantic 模型 class TrackerConfig(BaseModel): kind: Literal["linear", "memory"] = "linear" endpoint: str = "https://api.linear.app/graphql" api_key: str | None = None project_slug: str | None = None assignee: str | None = None active_states: list[str] = ["Todo", "In Progress"] terminal_states: list[str] = ["Closed", "Cancelled", "Canceled", "Duplicate", "Done"] class PollingConfig(BaseModel): interval_ms: int = Field(default=30000, ge=1000) class WorkspaceConfig(BaseModel): root: str = Field(default_factory=lambda: str(Path(tempfile.gettempdir()) / "symphony_workspaces")) class HooksConfig(BaseModel): after_create: str | None = None before_run: str | None = None after_run: str | None = None before_remove: str | None = None timeout_ms: int = Field(default=60000, ge=1000) class AgentConfig(BaseModel): max_concurrent_agents: int = Field(default=10, ge=1) max_turns: int = Field(default=20, ge=1) max_retry_backoff_ms: int = Field(default=300000, ge=1000) max_concurrent_agents_by_state: dict[str, int] = Field(default_factory=dict) class CodexConfig(BaseModel): command: str = "codex app-server" approval_policy: str | dict = "never" thread_sandbox: str = "workspace-write" turn_sandbox_policy: dict | None = None turn_timeout_ms: int = Field(default=3600000, ge=1000) read_timeout_ms: int = Field(default=5000, ge=1000) stall_timeout_ms: int = Field(default=300000, ge=0) class ServerConfig(BaseModel): port: int | None = Field(default=None, ge=0) host: str = "127.0.0.1" class SymphonyConfig(BaseModel): tracker: TrackerConfig = Field(default_factory=TrackerConfig) polling: PollingConfig = Field(default_factory=PollingConfig) workspace: WorkspaceConfig = Field(default_factory=WorkspaceConfig) hooks: HooksConfig = Field(default_factory=HooksConfig) agent: AgentConfig = Field(default_factory=AgentConfig) codex: CodexConfig = Field(default_factory=CodexConfig) server: ServerConfig = Field(default_factory=ServerConfig) ``` ### 4.2 Issue 模型 (models/issue.py) ```python class Issue(BaseModel): """Linear Issue 标准化模型""" id: str identifier: str title: str description: str | None = None priority: int | None = None state: str branch_name: str | None = None url: str | None = None assignee_id: str | None = None labels: list[str] = Field(default_factory=list) blocked_by: list[BlockerRef] = Field(default_factory=list) assigned_to_worker: bool = True created_at: datetime | None = None updated_at: datetime | None = None class BlockerRef(BaseModel): id: str | None = None identifier: str | None = None state: str | None = None ``` ### 4.3 Orchestrator 状态机 (orchestrator/state.py) ```python @dataclass class RunningEntry: """运行中的 Issue 条目""" task: asyncio.Task issue: Issue worker_host: str | None = None workspace_path: str | None = None session_id: str | None = None codex_app_server_pid: str | None = None last_codex_message: str | None = None last_codex_timestamp: datetime | None = None last_codex_event: str | None = None codex_input_tokens: int = 0 codex_output_tokens: int = 0 codex_total_tokens: int = 0 turn_count: int = 0 retry_attempt: int = 0 started_at: datetime = field(default_factory=datetime.utcnow) @dataclass class RetryEntry: """重试队列条目""" issue_id: str identifier: str attempt: int due_at: datetime error: str | None = None worker_host: str | None = None workspace_path: str | None = None timer_handle: asyncio.TimerHandle | None = None @dataclass class OrchestratorState: """Orchestrator 运行时状态""" poll_interval_ms: int = 30000 max_concurrent_agents: int = 10 running: dict[str, RunningEntry] = field(default_factory=dict) claimed: set[str] = field(default_factory=set) retry_attempts: dict[str, RetryEntry] = field(default_factory=dict) completed: set[str] = field(default_factory=set) codex_totals: CodexTotals = field(default_factory=CodexTotals) codex_rate_limits: dict | None = None ``` --- ## 5. 实现阶段计划 ### 阶段 1: 项目骨架和基础配置 **目标**: 建立项目结构,实现配置系统 **任务清单**: - 创建项目目录结构 - 编写 pyproject.toml 和依赖 - 实现 config/schema.py - Pydantic 配置模型 - 实现 config/config.py - 配置加载和验证 - 实现 workflow/loader.py - WORKFLOW.md 解析 - 编写配置系统单元测试 ### 阶段 2: Issue Tracker 和模型 **目标**: 实现 Linear API 客户端和 Issue 模型 **任务清单**: - 实现 models/issue.py - Issue 数据模型 - 实现 trackers/base.py - Tracker 抽象基类 - 实现 trackers/linear.py - Linear GraphQL 客户端 - 实现 trackers/memory.py - 内存 Tracker (测试用) - 实现 Issue 查询的分页和归一化 - 编写 Linear Client 测试 ### 阶段 3: Workspace 管理 **目标**: 实现工作空间生命周期管理 **任务清单**: - 实现 workspace/safety.py - 路径安全验证 - 实现 workspace/manager.py - 工作空间管理 - 实现 workspace/hooks.py - Hook 执行 - 支持本地工作空间 - 支持远程 SSH 工作空间 (可选) - 编写 Workspace 测试 ### 阶段 4: Agent 和 Prompt **目标**: 集成 AgentScope,实现 Agent 执行 **任务清单**: - 实现 prompts/builder.py - Prompt 构建 - 实现 agents/tools/linear_graphql.py - Linear GraphQL 工具 - 实现 agents/react_agent.py - ReAct Agent - 实现 agents/client.py - AgentScope 客户端封装 - 集成 AgentScope 的模型和消息系统 - 编写 Agent 测试 ### 阶段 5: Orchestrator 核心 **目标**: 实现核心调度器逻辑 **任务清单**: - 实现 orchestrator/state.py - 状态管理 - 实现 orchestrator/dispatch.py - 分派逻辑 - 实现 orchestrator/retry.py - 重试机制 - 实现 orchestrator/reconciliation.py - 状态调和 - 实现 orchestrator/orchestrator.py - 主调度器 - 实现轮询循环和任务管理 - 编写 Orchestrator 测试 ### 阶段 6: Dashboard 和日志 **目标**: 实现状态仪表板和结构化日志 **任务清单**: - 实现 utils/logging.py - 结构化日志 - 实现 dashboard/snapshot.py - 状态快照 - 实现 dashboard/formatter.py - 格式化 - 实现 dashboard/dashboard.py - 终端仪表板 - 集成 rich 库渲染 - 实现实时刷新 ### 阶段 7: Web API (可选) **目标**: 实现 HTTP API 服务器 **任务清单**: - 实现 web/server.py - FastAPI 服务器 - 实现 web/api.py - API 路由 - GET /api/v1/state - 状态查询 - GET /api/v1/<issue_id> - Issue 详情 - POST /api/v1/refresh - 触发轮询 - 静态文件服务 (可选) ### 阶段 8: CLI 和集成 **目标**: 实现命令行接口和集成测试 **任务清单**: - 实现 cli.py - 命令行参数解析 - 实现 __main__.py - 入口点 - 实现启动流程 - 配置文件热重载 - 优雅退出处理 - 编写集成测试 ### 阶段 9: 测试和文档 **目标**: 完善测试和文档 **任务清单**: - 编写单元测试 (覆盖率 > 80%) - 编写集成测试 - 编写 README.md - 编写架构说明.md - 编写配置指南.md - 编写 API文档.md - 代码审查和优化 --- ## 6. 关键技术决策 ### 6.1 异步架构 使用 Python 的 asyncio 替代 Elixir 的 OTP: ```python # Elixir GenServer 对应 Python asyncio # handle_info -> asyncio.Queue / asyncio.Event # Process.monitor -> asyncio.Task 异常处理 # Process.send_after -> asyncio.call_later class Orchestrator: def __init__(self): self._message_queue = asyncio.Queue() self._retry_timers: dict[str, asyncio.TimerHandle] = {} self._running_tasks: dict[str, asyncio.Task] = {} ``` ### 6.2 状态管理 使用 dataclass + 异步锁管理状态: ```python @dataclass class OrchestratorState: running: dict[str, RunningEntry] retry_attempts: dict[str, RetryEntry] class Orchestrator: def __init__(self): self._state = OrchestratorState() self._state_lock = asyncio.Lock() ``` ### 6.3 AgentScope 集成策略 不直接使用 ReActAgent,而是使用底层组件: ```python from agentscope.models import OpenAIChatWrapper from agentscope.message import Msg from agentscope.tool import Toolkit class SymphonyAgent: def __init__(self, model_config, tools): self.model = OpenAIChatWrapper(**model_config) self.toolkit = Toolkit() for tool in tools: self.toolkit.register_tool_function(tool) ``` --- ## 7. 验收标准 ### 功能完整性 - 可以加载和解析 WORKFLOW.md - 可以连接 Linear API 查询 Issues - 可以创建工作空间并执行 Hooks - 可以分派 Agent 执行任务 - 支持多轮对话 - 支持重试和退避 - 状态调和正确 - 仪表板实时显示 ### 性能要求 - 轮询间隔可配置 (默认 30s) - 并发 Agent 数量可配置 (默认 10) - 内存使用稳定,无泄漏 ### 代码质量 - 类型注解覆盖率 > 90% - 单元测试覆盖率 > 80% - 符合 PEP8 规范 - 文档字符串完整 --- ## 8. 风险评估 | 风险 | 影响 | 缓解措施 | |------|------|----------| | AgentScope API 变化 | 高 | 锁定版本,关注更新 | | Linear API 变化 | 中 | 隔离 API 调用层 | | Python asyncio 复杂度 | 中 | 详细测试,代码审查 | | SSH 远程执行安全 | 高 | 严格输入验证 | --- ## 9. 附录 ### 9.1 参考文档 - [AgentScope 文档](https://doc.agentscope.io/) - [Symphony SPEC.md](../symphony/SPEC.md) - [Elixir 实现源码](../symphony/elixir/) ### 9.2 术语对照 | Elixir | Python | |--------|--------| | GenServer | asyncio + dataclass | | GenServer.call/cast | asyncio.Queue | | Process.monitor | asyncio.Task.add_done_callback | | Agent | AgentRunner + SymphonyAgent | | GenServer.handle_info | 消息处理协程 | --- *文档版本: 1.0* *最后更新: 2026-03-15*

讨论回复

1 条回复
✨步子哥 (steper) #1
03-15 02:49
https://github.com/linkerlin/symphony.py