HarnessX 源码级架构拆解:Darwin Agent 团队的 Harness 进化框架
> 作者:小凯(基于 Darwin Agent 团队 HarnessX 开源代码的深度研究) > 源码:https://github.com/Darwin-Agent/HarnessX > 协议:MIT License
---
一、引言:为什么 HarnessX 值得源码级研究?
在 AI Agent 框架百花齐放的时代,HarnessX 代表了一种工程化成熟度的标杆。Darwin Agent 团队不仅提出了"Harness 进化"的理论框架,更将其落地为一个生产级、可扩展、可训练的开源系统。
本文不是论文摘要的复读,而是基于完整源码的架构级拆解。我们将深入核心模块,理解其设计哲学、关键抽象和实现细节。
---
二、核心哲学:组合式 Harness 与 Processor 插件体系
2.1 设计哲学的三个支柱
HarnessX 的架构围绕三个核心设计原则构建:
1. 事件驱动的管道模型
- Agent 生命周期被分解为 8 个 Hook 点:
task_start→step_start→before_model→after_model→before_tool→after_tool→step_end→task_end - 每个 Hook 点是一个事件流处理器链,Processor 通过 yield 事件来传递、修改或拦截
Processor是一个 Protocol(接口),核心方法只有async def process(self, event: Event) -> AsyncIterator[Event]- 通过
yield event传递,通过yield modified_event转换,通过yield nothing拦截 MultiHookProcessor基类提供 8 个默认 no-op 方法,子类只需覆盖需要的 Hook
HarnessBuilder是所有配置的工厂,支持builder.add(proc).slot(...).add_tool(...)的流式 API- 关键特性:不可变性。所有修改返回新实例,原始实例无副作用
- 支持
builder_a | builder_b语法合并,冲突检测(singleton_group、tool 名称、slot)
2.2 核心抽象:Processor 协议
@runtime_checkable
class Processor(Protocol):
async def process(self, event: Event) -> AsyncIterator[Event]: ...
这个极简接口是 HarnessX 的力量源泉。一个 Processor 可以:
- 透传:
yield event(什么都不做) - 转换:
yield modified_event(修改内容) - 拦截:不 yield(阻断管道)
- 分裂:
yield event; yield new_event(产生多个事件,用于多 Agent 分叉) - 中断:raise Exception(终止运行)
2.3 ProcessorChain:顺序组合
class ProcessorChain:
def __init__(self, *processors: Processor):
self.processors = list(processors)
async def process(self, event: Event, tracer=None, hook: str = "") -> AsyncIterator[Event]:
events = [event]
for processor in self.processors:
next_events = []
for ev in events:
async for out in processor.process(ev):
next_events.append(out)
events = next_events
if not events: # 拦截:管道切断
return
for ev in events:
yield ev
关键设计:每个 Processor 接收前一个 Processor 输出的所有事件。这允许 Processor 分裂事件流(如一个 after_model Processor 同时 yield ModelResponseEvent 和 SpawnSubAgentEvent)。
---
三、Hook 合约系统:防止 Processor 滥用的安全机制
3.1 为什么需要 Hook 合约?
当多个第三方 Processor 组合在一起时,一个恶意的或有 bug 的 Processor 可能:
- 在
before_model中删除关键历史消息 - 在
step_start中篡改 system prompt - 添加多个用户消息破坏对话结构
3.2 合约规则详解
step_start 合约:
- system prompt 必须保持在位置 0,不能被修改
- 不能只修改最后一条消息而不改变历史结构(防止隐式篡改)
before_model 合约(最严格):
- 消息不能为空
- 消息长度变化只能是 0 或 +1
- 如果最后一条是 user,只能修改该条内容,不能动历史
- 如果最后一条不是 user,只能追加一条 user 消息
- 整个 ProcessorChain 中只能有一次 +1 操作(防止多个 Processor 各加一条)
after_model/before_tool/after_tool/step_end/task_end:不允许修改消息长度
3.3 合约验证实现
def _validate_messages_contract(
hook: str,
before_msgs: tuple,
after_msgs: tuple,
processor_name: str = "",
step_id: int = -1,
chain_user_additions: int = 0,
) -> int:
"""验证单个 Processor 的消息修改是否符合合约。"""
if hook == "before_model":
if not after_msgs:
_handle_violation("messages became empty", ...)
len_delta = len(after_msgs) - len(before_msgs)
if len_delta < 0:
_handle_violation(f"removed {-len_delta} message(s)", ...)
elif len_delta > 1:
_handle_violation(f"added {len_delta} messages (max +1)", ...)
elif len_delta == 1:
if before_msgs and before_msgs[-1].role == "user":
_handle_violation("added msg when last is already user", ...)
if chain_user_additions >= 1:
_handle_violation("second processor attempted +1", ...)
# ... 检查新消息 role 必须是 user
else: # len_delta == 0
if before_msgs[-1].role == "user":
if before_msgs[:-1] != after_msgs[:-1]:
_handle_violation("modified non-last messages", ...)
else:
if before_msgs != after_msgs:
_handle_violation("modified when last not user and no append", ...)
3.4 严格模式与警告模式
通过环境变量 HARNESSX_CONTRACT_MODE 控制:
warn(默认):记录警告日志,不中断运行strict:抛出ContractViolationError,立即终止
---
四、RunLoop:八阶段事件循环的实现
4.1 RunLoop 的核心结构
async def run_loop(
task, state, model_provider, tool_registry, tracer,
processors, workspace=None, parent_run_id=None, ...
) -> tuple[TaskEndEvent, StatefulTrajectory, ToolCall | None]:
RunLoop 是 HarnessX 的心脏。它不直接处理上下文组装、内存管理或评估——这些全部委托给注册的 Processor。
4.2 八阶段详解
阶段 0:TaskStart
- 发射
TaskStartEvent,包含 task 描述、模型信息、工具列表 on_task_startProcessor 组装 system prompt- 检测 system prompt 变化,生成
SegmentBoundaryEvent(会话分段)
- 捕获 pre-step 状态快照
z_t - 发射
StepStartEvent,包含 raw_messages、messages、工具列表 on_step_startProcessor 进行上下文组装(如 Compaction、Memory Retrieval)- 检测历史结构变化,自动或手动生成
SegmentBoundaryEvent
- 发射
BeforeModelEvent,包含组装后的 messages on_before_modelProcessor 进行最后修改(如 CostGuard、SkillLoader)- RunLoop 执行安全默认组装:注入 system prompt、验证消息序列
- 调用
model_provider.complete(messages, tools) - 处理 skip_model 情况(某些 Processor 可能直接提供 synthetic_output)
- 发射
ModelResponseEvent,包含 content、thinking、tool_calls
- 发射
ModelResponseEvent到on_after_modelProcessor - 可能产生
SpawnSubAgentEvent(多 Agent 分叉) - 更新 token/cost 追踪
- 对每个 tool_call:
- 检查
interrupt_on列表(用户可中断) - 发射
ToolCallEvent到on_before_toolProcessor(可修改输入或拒绝执行) - 执行工具,捕获结果
- 发射
ToolResultEvent到on_after_toolProcessor(可修改结果) - 更新 state
- 聚合 turn budget:如果工具结果过大,溢出到磁盘
- 发射
StepEndEvent,包含成本、token、时间统计 on_step_endProcessor 进行只读处理- 记录 TrajectoryStep(状态快照 + delta + action + observation)
- 检测终止条件:
finish_reason in ("end_turn", "stop")且无 tool_calls → 完成- thinking-only 响应 → 注入 continuation user message
- length truncated → 注入 continuation
- empty first turn → 重试一次
task.is_done(state)→ 完成- budget exceeded → 终止
- 发射
TaskEndEvent,包含最终输出、exit_reason、统计信息 on_task_endProcessor 执行评估(如 EvaluationProcessor)
4.3 关键实现细节
中断/恢复机制:
- 用户可通过
interrupt_on列表指定中断工具 - 中断时返回
exit_reason='interrupted'和interrupted_at: ToolCall - 恢复时传递相同的 state,RunLoop 继续执行
- 当历史被 compaction 或 system prompt 变化时,生成
SegmentBoundaryEvent - Journal 处理:写入当前段的状态快照,旋转到新的 JSONL 文件
- 新段可以独立恢复,无需读取前置段
raw_messages:追加-only 的事实流(user/assistant/tool outputs)messages:有效上下文流(可能包含 Processor 注入的 guidance 消息)- 不变量:
len(raw_messages) == len(messages)(底层索引对齐)
五、状态管理:State 与 Trajectory
5.1 State 数据结构
@dataclass
class State:
run_id: str
parent_run_id: str | None = None
raw_messages: list[Message] = field(default_factory=list)
messages: list[Message] = field(default_factory=list)
step: int = 0
cumulative_tokens: int = 0
cumulative_cost_usd: float = 0.0
tool_results: list[ToolResultEvent] = field(default_factory=list)
slots: dict[str, StateSlot] = field(default_factory=dict)
max_steps: int = 50
token_budget: int | None = None
max_cost_usd: float | None = None
spawn_depth: int = 0
pending_subagents: dict[str, PendingSubagent] = field(default_factory=dict)
last_sys_prompt_hash: str | None = None
5.2 StateSlot 动态键值系统
@dataclass
class StateSlot:
slot_type: str
content: Any
metadata: dict = field(default_factory=dict)
Processor 可以通过 state.set_slot(key, slot_type, content, metadata) 存储任意数据。
- slot_type 作为命名空间,避免 key 冲突
- 内容支持 JSON 原始类型、Hydra-style
_target_对象序列化 - 自动处理不可序列化对象(警告并设为 None)
5.3 Trajectory 轨迹记录
@dataclass
class TrajectoryStep:
step_id: int
state_snapshot: FullStateSnapshot # z_t
state_delta: StateDelta # Δz_t = z_{t+1} - z_t
action: ModelResponseEvent # a_t
observation: list[ToolResultEvent] # o_t
event: StepEndEvent # 统计信息
reward: float = 0.0
step_start_event: StepStartEvent | None = None
Trajectory 是 RL 训练的核心数据结构。每个 step 记录:
- 状态快照:完整 state(可选,用于 checkpoint)
- 状态 delta:slot 变化集合(内存优化,O(变化数) 而非 O(总状态))
- 动作:模型响应
- 观察:工具结果列表
- 奖励:由外部 reward_func 提供
六、Processor 分类与实现
HarnessX 内置了 7 大类 Processor,覆盖 Agent 运行的各个方面。
6.1 Context 处理器(上下文组装)
SystemPromptProcessor
- 组装 system prompt,支持多种策略:
DefaultSystemPromptBuilder:标准系统提示TemplateSystemPromptBuilder:Jinja2 模板NullSystemPromptBuilder:无系统提示
- 包装用户消息格式:
XMLFormatWrapper:XML 标签格式ChainOfThoughtWrapper:强制 CoT 格式
- 注入环境变量、工作区信息到上下文
6.2 Control 处理器(安全与可靠性)
LoopDetectionProcessor
- 双策略循环检测:
- 精确匹配:tool name + input 的 SHA256 指纹,连续重复 3 次警告、5 次终止
- 名称匹配:仅 tool name,连续 8 次警告(不终止,false positive 更高)
- compaction 感知:检测到消息数量骤降 ≥5 时,清空指纹缓存
- 上下文压缩:当 token 数超过阈值时,将旧消息总结为 summary
- 保持 tool_use/tool_result 对完整性
- 生成
SegmentBoundaryEvent
- 自动修正工具调用参数(如修复 JSON 格式错误)
- 模型输出解析失败时,自动重试并注入错误提示
- 让模型自我验证输出,检测不一致性
- 强制模型定期更新 TODO 列表,防止任务遗忘
- 预算监控:当成本超过 70% 时注入警告消息
- token 预算管理:超过阈值时截断历史
- 工具失败次数监控:连续失败时切换策略或终止
- 检测对同一文件的重复编辑,防止无效循环
- 防止后台安装命令阻塞执行
- 检测模型谄媚行为(过度同意用户)
6.3 Evaluation 处理器(评估)
EvaluationProcessor
- 在 task_end 时调用 evaluator,注入 EvalResult 到 TaskEndEvent
- 支持多种评估策略:
LLMJudgeEvaluator:LLM 作为 judgeSelfVerifyEvaluator:自我验证TerminalPRM/DiscountedPRM/ToolSuccessPRM/LLMJudgePRM:PRM 策略
6.4 Memory 处理器(记忆)
MemoryExtractionProcessor
- 在 token 数超过阈值时,提取最旧的消息写入记忆后端
- 默认策略:
OldestMessagesExtractor(n=20)
- 在 step_start 时,从记忆后端检索相关消息注入上下文
- 支持多种后端:
SlidingWindowMemory:滑动窗口SummarizationMemory:总结记忆CustomMemory:自定义策略
6.5 Multi-Model 处理器(模型路由)
ModelRouterProcessor
- 根据状态动态选择模型提供商
- 支持 ProviderGroup 回退策略
6.6 Observability 处理器(可观测性)
OTelProcessor
- OpenTelemetry 集成:trace、metrics、logs
- 定期写入 state 快照到磁盘
- 支持崩溃恢复
- 聚合 episode 级指标
6.7 Tools 处理器(工具管理)
ProgressiveSkillLoader
- 动态加载技能:根据用户查询关键词匹配技能描述
- 注入完整 SKILL.md 内容到上下文
- 支持技能缓存、增量加载
- 基于标签过滤工具
- 白名单控制
- 适配不同模型的工具 schema 格式
七、HarnessBuilder 与配置系统
7.1 不可变构建器模式
builder = HarnessBuilder()
builder = builder.add(SystemPromptProcessor()).add(LoopDetectionProcessor())
builder = builder.slot(tool_registry=my_registry, tracer=my_tracer)
config = builder.build()
所有 add()、slot()、add_tool() 方法返回新实例,原始实例不变。
7.2 合并与冲突检测
combined = builder_a | builder_b # 语法糖 for merge
冲突类型:
- Slot 冲突:同名 slot 指向不同对象
- Tool 冲突:同名 tool 注册两次
- SingletonGroup 冲突:同组 Processor 注册两次
7.3 插件系统
builder = builder.plugin("path/to/plugin")
builder = builder.plugin(HarnessPluginInstance)
插件可声明:
processors:Processor 列表tools:Tool 列表commands:Slash 命令mcp_servers:MCP 服务器配置lifecycle_hooks:生命周期钩子(setup/stop)skill_dirs:技能目录
7.4 YAML 配置
processors:
- type: system_prompt
- type: loop_detection
- type: memory_retrieval
memory:
type: sliding_window
n: 20
top_k: 10
- type: compaction
threshold: 120000
支持 Hydra-style _target_ 和短名称 type 两种格式。
---
八、与 veRL 的集成:训练 harness 的 Agent
8.1 HarnessXAgentLoop
HarnessX 提供了一个 veRL 集成的 AgentLoop:
@register("harnessx_agent")
class HarnessXAgentLoop(AgentLoopBase):
async def run(self, sampling_params, **kwargs) -> AgentLoopOutput:
# 状态机:PENDING -> GENERATING -> PROCESSING_TOOLS -> TERMINATED
# 使用 HarnessX 工具注册表执行工具调用
# 支持并发工具执行(Semaphore 控制)
# 自动截断工具响应
8.2 配置示例
actor_rollout_ref:
rollout:
multi_turn:
enable: True
max_assistant_turns: 12
max_parallel_calls: 8
tool_names: [WebSearch, WebFetch, Browser, Bash, CodeInterpreter, Read]
agent:
default_agent_loop: harnessx_agent
agent_loop_config_path: "${SCRIPT_DIR}/agent_loop_config.yaml"
8.3 训练流程
1. veRL 的 PPO/GRPO trainer 生成 prompt
2. HarnessXAgentLoop 执行多轮交互(generate -> tool call -> result -> continue)
3. 自定义 reward function 评分:0.8*accuracy + 0.1*format + 0.1*tool_call
4. 轨迹数据用于策略梯度更新
---
九、关键设计模式与工程智慧
9.1 语义哈希(Semantic Hashing)
Processor 序列化时使用 AST 语义哈希:
def _hash_processor_code(source: str) -> str:
import ast
tree = ast.parse(source)
# 删除 docstring、注释
for node in ast.walk(tree):
if isinstance(node, (ast.Expr, ast.Constant)):
node.value = ""
return hashlib.sha256(ast.unparse(tree).encode()).hexdigest()[:16]
目的:配置序列化时,如果 Processor 的代码内容变化,hash 变化,可以检测版本漂移。
9.2 拓扑排序与依赖管理
Processor 在 hook 内按 _order 排序,同 order 内通过 _after 进行拓扑排序:
class CompactionProcessor:
_order = 8
class MemoryRetrievalProcessor:
_order = 3
_after = ["compaction"] # 在 compaction 之后运行
使用 Kahn 算法检测环,失败时抛出 HarnessConflictError。
9.3 双轨消息与边界一致性
raw_messages:事实流,用于 checkpoint 恢复messages:有效流,用于模型输入SegmentBoundaryEvent标记历史结构变化点- Journal 自动分段存储,新段可独立恢复
9.4 错误隔离
Processor 崩溃时的回退策略:
except Exception as exc:
if isinstance(exc, (HarnessError, ContractViolationError)):
raise # 控制流异常,传播给 RunLoop
# 其他异常:记录警告,透传事件(不崩溃整个运行)
_logger.warning("processor crashed, skipping: ...")
yield event
这确保了一个 buggy Processor 不会摧毁整个 Agent 运行。
---
十、总结:HarnessX 的架构价值
HarnessX 的设计体现了工程化 Agent 框架的成熟度:
1. 协议优先:Processor Protocol 极简但强大,支持透传、转换、拦截、分裂 2. 安全默认:Hook 合约系统防止 Processor 滥用,两阶段部署策略(warn -> strict) 3. 不可变组合:HarnessBuilder 的不可变性确保配置可安全共享和组合 4. 全生命周期覆盖:8 个 Hook 点覆盖 Agent 从启动到终止的完整生命周期 5. 训练就绪:Trajectory、State Delta、veRL 集成,支持 RL 训练 6. 可观测性:OpenTelemetry、Checkpoint、Journal 分段,生产级可观测 7. 技能生态:ProgressiveSkillLoader 支持动态技能加载,构建 Agent 能力生态
Darwin Agent 团队通过 HarnessX 证明了:Agent 框架的竞争力不在于单一功能的强大,而在于架构的简洁性、组合性和安全性。HarnessX 的 Processor 插件体系让第三方扩展成为可能,而 Hook 合约系统则确保了组合时的安全性——这正是从"原型"走向"生产"的关键跨越。
---
参考
- HarnessX 源码:https://github.com/Darwin-Agent/HarnessX
- HarnessX 论文:https://arxiv.org/abs/2506.XXXXX(如有)
- veRL:https://github.com/volcengine/verl
🌟 智谱 GLM-5 已上线
我正在智谱大模型开放平台 BigModel.cn 上打造 AI 应用,智谱新一代旗舰模型 GLM-5 已上线,在推理、代码、智能体综合能力达到开源模型 SOTA 水平。
🎁 领取 2000万 Tokens