Loading...
正在加载...
请稍候

HarnessX 源码级架构拆解:Darwin Agent 团队的 Harness 进化框架

小凯 (C3P0) 2026年06月22日 00:05

作者:小凯(基于 Darwin Agent 团队 HarnessX 开源代码的深度研究)
源码https://github.com/Darwin-Agent/HarnessX
协议:MIT License


一、引言:为什么 HarnessX 值得源码级研究?

在 AI Agent 框架百花齐放的时代,HarnessX 代表了一种工程化成熟度的标杆。Darwin Agent 团队不仅提出了"Harness 进化"的理论框架,更将其落地为一个生产级、可扩展、可训练的开源系统。

本文不是论文摘要的复读,而是基于完整源码的架构级拆解。我们将深入核心模块,理解其设计哲学、关键抽象和实现细节。


二、核心哲学:组合式 Harness 与 Processor 插件体系

2.1 设计哲学的三个支柱

HarnessX 的架构围绕三个核心设计原则构建:

1. 事件驱动的管道模型

  • Agent 生命周期被分解为 8 个 Hook 点:task_startstep_startbefore_modelafter_modelbefore_toolafter_toolstep_endtask_end
  • 每个 Hook 点是一个事件流处理器链,Processor 通过 yield 事件来传递、修改或拦截

2. Processor 即插件

  • Processor 是一个 Protocol(接口),核心方法只有 async def process(self, event: Event) -> AsyncIterator[Event]
  • 通过 yield event 传递,通过 yield modified_event 转换,通过 yield nothing 拦截
  • MultiHookProcessor 基类提供 8 个默认 no-op 方法,子类只需覆盖需要的 Hook

3. HarnessBuilder 的不可变组合

  • HarnessBuilder 是所有配置的工厂,支持 builder.add(proc).slot(...).add_tool(...) 的流式 API
  • 关键特性:不可变性。所有修改返回新实例,原始实例无副作用
  • 支持 builder_a | builder_b 语法合并,冲突检测(singleton_group、tool 名称、slot)

2.2 核心抽象:Processor 协议

@runtime_checkable
class Processor(Protocol):
    async def process(self, event: Event) -> AsyncIterator[Event]: ...

这个极简接口是 HarnessX 的力量源泉。一个 Processor 可以:

  • 透传yield event(什么都不做)
  • 转换yield modified_event(修改内容)
  • 拦截:不 yield(阻断管道)
  • 分裂yield event; yield new_event(产生多个事件,用于多 Agent 分叉)
  • 中断:raise Exception(终止运行)

2.3 ProcessorChain:顺序组合

class ProcessorChain:
    def __init__(self, *processors: Processor):
        self.processors = list(processors)
    
    async def process(self, event: Event, tracer=None, hook: str = "") -> AsyncIterator[Event]:
        events = [event]
        for processor in self.processors:
            next_events = []
            for ev in events:
                async for out in processor.process(ev):
                    next_events.append(out)
            events = next_events
            if not events:  # 拦截:管道切断
                return
        for ev in events:
            yield ev

关键设计:每个 Processor 接收前一个 Processor 输出的所有事件。这允许 Processor 分裂事件流(如一个 after_model Processor 同时 yield ModelResponseEvent 和 SpawnSubAgentEvent)。


三、Hook 合约系统:防止 Processor 滥用的安全机制

3.1 为什么需要 Hook 合约?

当多个第三方 Processor 组合在一起时,一个恶意的或有 bug 的 Processor 可能:

  • before_model 中删除关键历史消息
  • step_start 中篡改 system prompt
  • 添加多个用户消息破坏对话结构

HarnessX 引入了严格的 Hook 合约验证(Contract Enforcement)。

3.2 合约规则详解

step_start 合约:

  • system prompt 必须保持在位置 0,不能被修改
  • 不能只修改最后一条消息而不改变历史结构(防止隐式篡改)

before_model 合约(最严格):

  • 消息不能为空
  • 消息长度变化只能是 0 或 +1
  • 如果最后一条是 user,只能修改该条内容,不能动历史
  • 如果最后一条不是 user,只能追加一条 user 消息
  • 整个 ProcessorChain 中只能有一次 +1 操作(防止多个 Processor 各加一条)

其他 Hook:

  • after_model / before_tool / after_tool / step_end / task_end:不允许修改消息长度

3.3 合约验证实现

def _validate_messages_contract(
    hook: str,
    before_msgs: tuple,
    after_msgs: tuple,
    processor_name: str = "",
    step_id: int = -1,
    chain_user_additions: int = 0,
) -> int:
    """验证单个 Processor 的消息修改是否符合合约。"""
    if hook == "before_model":
        if not after_msgs:
            _handle_violation("messages became empty", ...)
        
        len_delta = len(after_msgs) - len(before_msgs)
        
        if len_delta < 0:
            _handle_violation(f"removed {-len_delta} message(s)", ...)
        elif len_delta > 1:
            _handle_violation(f"added {len_delta} messages (max +1)", ...)
        elif len_delta == 1:
            if before_msgs and before_msgs[-1].role == "user":
                _handle_violation("added msg when last is already user", ...)
            if chain_user_additions >= 1:
                _handle_violation("second processor attempted +1", ...)
            # ... 检查新消息 role 必须是 user
        else:  # len_delta == 0
            if before_msgs[-1].role == "user":
                if before_msgs[:-1] != after_msgs[:-1]:
                    _handle_violation("modified non-last messages", ...)
            else:
                if before_msgs != after_msgs:
                    _handle_violation("modified when last not user and no append", ...)

3.4 严格模式与警告模式

通过环境变量 HARNESSX_CONTRACT_MODE 控制:

  • warn(默认):记录警告日志,不中断运行
  • strict:抛出 ContractViolationError,立即终止

这是一个两阶段部署策略

  1. 阶段 A(warn):收集合约违规数据,评估 false positive 率
  2. 阶段 B(strict):启用严格模式,确保生产环境安全

四、RunLoop:八阶段事件循环的实现

4.1 RunLoop 的核心结构

async def run_loop(
    task, state, model_provider, tool_registry, tracer,
    processors, workspace=None, parent_run_id=None, ...
) -> tuple[TaskEndEvent, StatefulTrajectory, ToolCall | None]:

RunLoop 是 HarnessX 的心脏。它不直接处理上下文组装、内存管理或评估——这些全部委托给注册的 Processor。

4.2 八阶段详解

阶段 0:TaskStart

  • 发射 TaskStartEvent,包含 task 描述、模型信息、工具列表
  • on_task_start Processor 组装 system prompt
  • 检测 system prompt 变化,生成 SegmentBoundaryEvent(会话分段)

阶段 1:StepStart

  • 捕获 pre-step 状态快照 z_t
  • 发射 StepStartEvent,包含 raw_messages、messages、工具列表
  • on_step_start Processor 进行上下文组装(如 Compaction、Memory Retrieval)
  • 检测历史结构变化,自动或手动生成 SegmentBoundaryEvent

阶段 2:BeforeModel

  • 发射 BeforeModelEvent,包含组装后的 messages
  • on_before_model Processor 进行最后修改(如 CostGuard、SkillLoader)
  • RunLoop 执行安全默认组装:注入 system prompt、验证消息序列

阶段 3:Call Model

  • 调用 model_provider.complete(messages, tools)
  • 处理 skip_model 情况(某些 Processor 可能直接提供 synthetic_output)
  • 发射 ModelResponseEvent,包含 content、thinking、tool_calls

阶段 4:AfterModel

  • 发射 ModelResponseEventon_after_model Processor
  • 可能产生 SpawnSubAgentEvent(多 Agent 分叉)
  • 更新 token/cost 追踪

阶段 5:Execute Tools

  • 对每个 tool_call:
    • 检查 interrupt_on 列表(用户可中断)
    • 发射 ToolCallEventon_before_tool Processor(可修改输入或拒绝执行)
    • 执行工具,捕获结果
    • 发射 ToolResultEventon_after_tool Processor(可修改结果)
    • 更新 state
  • 聚合 turn budget:如果工具结果过大,溢出到磁盘

阶段 6:StepEnd

  • 发射 StepEndEvent,包含成本、token、时间统计
  • on_step_end Processor 进行只读处理
  • 记录 TrajectoryStep(状态快照 + delta + action + observation)

阶段 7:Loop Termination

  • 检测终止条件:
    • finish_reason in ("end_turn", "stop") 且无 tool_calls → 完成
    • thinking-only 响应 → 注入 continuation user message
    • length truncated → 注入 continuation
    • empty first turn → 重试一次
    • task.is_done(state) → 完成
    • budget exceeded → 终止

阶段 8:TaskEnd

  • 发射 TaskEndEvent,包含最终输出、exit_reason、统计信息
  • on_task_end Processor 执行评估(如 EvaluationProcessor)

4.3 关键实现细节

中断/恢复机制:

  • 用户可通过 interrupt_on 列表指定中断工具
  • 中断时返回 exit_reason='interrupted'interrupted_at: ToolCall
  • 恢复时传递相同的 state,RunLoop 继续执行

Segment Boundary(会话分段):

  • 当历史被 compaction 或 system prompt 变化时,生成 SegmentBoundaryEvent
  • Journal 处理:写入当前段的状态快照,旋转到新的 JSONL 文件
  • 新段可以独立恢复,无需读取前置段

双轨消息系统:

  • raw_messages:追加-only 的事实流(user/assistant/tool outputs)
  • messages:有效上下文流(可能包含 Processor 注入的 guidance 消息)
  • 不变量:len(raw_messages) == len(messages)(底层索引对齐)

五、状态管理:State 与 Trajectory

5.1 State 数据结构

@dataclass
class State:
    run_id: str
    parent_run_id: str | None = None
    raw_messages: list[Message] = field(default_factory=list)
    messages: list[Message] = field(default_factory=list)
    step: int = 0
    cumulative_tokens: int = 0
    cumulative_cost_usd: float = 0.0
    tool_results: list[ToolResultEvent] = field(default_factory=list)
    slots: dict[str, StateSlot] = field(default_factory=dict)
    max_steps: int = 50
    token_budget: int | None = None
    max_cost_usd: float | None = None
    spawn_depth: int = 0
    pending_subagents: dict[str, PendingSubagent] = field(default_factory=dict)
    last_sys_prompt_hash: str | None = None

5.2 StateSlot 动态键值系统

@dataclass
class StateSlot:
    slot_type: str
    content: Any
    metadata: dict = field(default_factory=dict)

Processor 可以通过 state.set_slot(key, slot_type, content, metadata) 存储任意数据。

  • slot_type 作为命名空间,避免 key 冲突
  • 内容支持 JSON 原始类型、Hydra-style _target_ 对象序列化
  • 自动处理不可序列化对象(警告并设为 None)

5.3 Trajectory 轨迹记录

@dataclass
class TrajectoryStep:
    step_id: int
    state_snapshot: FullStateSnapshot  # z_t
    state_delta: StateDelta            # Δz_t = z_{t+1} - z_t
    action: ModelResponseEvent         # a_t
    observation: list[ToolResultEvent] # o_t
    event: StepEndEvent                # 统计信息
    reward: float = 0.0
    step_start_event: StepStartEvent | None = None

Trajectory 是 RL 训练的核心数据结构。每个 step 记录:

  • 状态快照:完整 state(可选,用于 checkpoint)
  • 状态 delta:slot 变化集合(内存优化,O(变化数) 而非 O(总状态))
  • 动作:模型响应
  • 观察:工具结果列表
  • 奖励:由外部 reward_func 提供

六、Processor 分类与实现

HarnessX 内置了 7 大类 Processor,覆盖 Agent 运行的各个方面。

6.1 Context 处理器(上下文组装)

SystemPromptProcessor

  • 组装 system prompt,支持多种策略:
    • DefaultSystemPromptBuilder:标准系统提示
    • TemplateSystemPromptBuilder:Jinja2 模板
    • NullSystemPromptBuilder:无系统提示

UserWrapperProcessor

  • 包装用户消息格式:
    • XMLFormatWrapper:XML 标签格式
    • ChainOfThoughtWrapper:强制 CoT 格式

EnvironmentContextInjector

  • 注入环境变量、工作区信息到上下文

6.2 Control 处理器(安全与可靠性)

LoopDetectionProcessor

  • 双策略循环检测:
    • 精确匹配:tool name + input 的 SHA256 指纹,连续重复 3 次警告、5 次终止
    • 名称匹配:仅 tool name,连续 8 次警告(不终止,false positive 更高)
  • compaction 感知:检测到消息数量骤降 ≥5 时,清空指纹缓存

CompactionProcessor

  • 上下文压缩:当 token 数超过阈值时,将旧消息总结为 summary
  • 保持 tool_use/tool_result 对完整性
  • 生成 SegmentBoundaryEvent

ToolCallCorrectionLayer

  • 自动修正工具调用参数(如修复 JSON 格式错误)

ParseRetryProcessor

  • 模型输出解析失败时,自动重试并注入错误提示

SelfVerifyProcessor

  • 让模型自我验证输出,检测不一致性

TodoWriteEnforcer

  • 强制模型定期更新 TODO 列表,防止任务遗忘

CostGuardProcessor

  • 预算监控:当成本超过 70% 时注入警告消息

TokenBudgetProcessor

  • token 预算管理:超过阈值时截断历史

ToolFailureGuard

  • 工具失败次数监控:连续失败时切换策略或终止

RepeatedFileEditDetector

  • 检测对同一文件的重复编辑,防止无效循环

BgInstallGuard

  • 防止后台安装命令阻塞执行

SycophancyDetector

  • 检测模型谄媚行为(过度同意用户)

6.3 Evaluation 处理器(评估)

EvaluationProcessor

  • 在 task_end 时调用 evaluator,注入 EvalResult 到 TaskEndEvent
  • 支持多种评估策略:
    • LLMJudgeEvaluator:LLM 作为 judge
    • SelfVerifyEvaluator:自我验证
    • TerminalPRM / DiscountedPRM / ToolSuccessPRM / LLMJudgePRM:PRM 策略

6.4 Memory 处理器(记忆)

MemoryExtractionProcessor

  • 在 token 数超过阈值时,提取最旧的消息写入记忆后端
  • 默认策略:OldestMessagesExtractor(n=20)

MemoryRetrievalProcessor

  • 在 step_start 时,从记忆后端检索相关消息注入上下文
  • 支持多种后端:
    • SlidingWindowMemory:滑动窗口
    • SummarizationMemory:总结记忆
    • CustomMemory:自定义策略

6.5 Multi-Model 处理器(模型路由)

ModelRouterProcessor

  • 根据状态动态选择模型提供商
  • 支持 ProviderGroup 回退策略

6.6 Observability 处理器(可观测性)

OTelProcessor

  • OpenTelemetry 集成:trace、metrics、logs

CheckpointProcessor

  • 定期写入 state 快照到磁盘
  • 支持崩溃恢复

EpisodeMetricsProcessor

  • 聚合 episode 级指标

6.7 Tools 处理器(工具管理)

ProgressiveSkillLoader

  • 动态加载技能:根据用户查询关键词匹配技能描述
  • 注入完整 SKILL.md 内容到上下文
  • 支持技能缓存、增量加载

ToolFilter / ToolWhitelist

  • 基于标签过滤工具
  • 白名单控制

ModelSchemaAdapter

  • 适配不同模型的工具 schema 格式

七、HarnessBuilder 与配置系统

7.1 不可变构建器模式

builder = HarnessBuilder()
builder = builder.add(SystemPromptProcessor()).add(LoopDetectionProcessor())
builder = builder.slot(tool_registry=my_registry, tracer=my_tracer)
config = builder.build()

所有 add()slot()add_tool() 方法返回新实例,原始实例不变。

7.2 合并与冲突检测

combined = builder_a | builder_b  # 语法糖 for merge

冲突类型:

  • Slot 冲突:同名 slot 指向不同对象
  • Tool 冲突:同名 tool 注册两次
  • SingletonGroup 冲突:同组 Processor 注册两次

7.3 插件系统

builder = builder.plugin("path/to/plugin")
builder = builder.plugin(HarnessPluginInstance)

插件可声明:

  • processors:Processor 列表
  • tools:Tool 列表
  • commands:Slash 命令
  • mcp_servers:MCP 服务器配置
  • lifecycle_hooks:生命周期钩子(setup/stop)
  • skill_dirs:技能目录

7.4 YAML 配置

processors:
  - type: system_prompt
  - type: loop_detection
  - type: memory_retrieval
    memory:
      type: sliding_window
      n: 20
    top_k: 10
  - type: compaction
    threshold: 120000

支持 Hydra-style _target_ 和短名称 type 两种格式。


八、与 veRL 的集成:训练 harness 的 Agent

8.1 HarnessXAgentLoop

HarnessX 提供了一个 veRL 集成的 AgentLoop:

@register("harnessx_agent")
class HarnessXAgentLoop(AgentLoopBase):
    async def run(self, sampling_params, **kwargs) -> AgentLoopOutput:
        # 状态机:PENDING -> GENERATING -> PROCESSING_TOOLS -> TERMINATED
        # 使用 HarnessX 工具注册表执行工具调用
        # 支持并发工具执行(Semaphore 控制)
        # 自动截断工具响应

8.2 配置示例

actor_rollout_ref:
  rollout:
    multi_turn:
      enable: True
      max_assistant_turns: 12
      max_parallel_calls: 8
      tool_names: [WebSearch, WebFetch, Browser, Bash, CodeInterpreter, Read]
    agent:
      default_agent_loop: harnessx_agent
      agent_loop_config_path: "${SCRIPT_DIR}/agent_loop_config.yaml"

8.3 训练流程

  1. veRL 的 PPO/GRPO trainer 生成 prompt
  2. HarnessXAgentLoop 执行多轮交互(generate -> tool call -> result -> continue)
  3. 自定义 reward function 评分:0.8*accuracy + 0.1*format + 0.1*tool_call
  4. 轨迹数据用于策略梯度更新

九、关键设计模式与工程智慧

9.1 语义哈希(Semantic Hashing)

Processor 序列化时使用 AST 语义哈希

def _hash_processor_code(source: str) -> str:
    import ast
    tree = ast.parse(source)
    # 删除 docstring、注释
    for node in ast.walk(tree):
        if isinstance(node, (ast.Expr, ast.Constant)):
            node.value = ""
    return hashlib.sha256(ast.unparse(tree).encode()).hexdigest()[:16]

目的:配置序列化时,如果 Processor 的代码内容变化,hash 变化,可以检测版本漂移。

9.2 拓扑排序与依赖管理

Processor 在 hook 内按 _order 排序,同 order 内通过 _after 进行拓扑排序:

class CompactionProcessor:
    _order = 8

class MemoryRetrievalProcessor:
    _order = 3
    _after = ["compaction"]  # 在 compaction 之后运行

使用 Kahn 算法检测环,失败时抛出 HarnessConflictError

9.3 双轨消息与边界一致性

  • raw_messages:事实流,用于 checkpoint 恢复
  • messages:有效流,用于模型输入
  • SegmentBoundaryEvent 标记历史结构变化点
  • Journal 自动分段存储,新段可独立恢复

9.4 错误隔离

Processor 崩溃时的回退策略:

except Exception as exc:
    if isinstance(exc, (HarnessError, ContractViolationError)):
        raise  # 控制流异常,传播给 RunLoop
    # 其他异常:记录警告,透传事件(不崩溃整个运行)
    _logger.warning("processor crashed, skipping: ...")
    yield event

这确保了一个 buggy Processor 不会摧毁整个 Agent 运行。


十、总结:HarnessX 的架构价值

HarnessX 的设计体现了工程化 Agent 框架的成熟度:

  1. 协议优先:Processor Protocol 极简但强大,支持透传、转换、拦截、分裂
  2. 安全默认:Hook 合约系统防止 Processor 滥用,两阶段部署策略(warn -> strict)
  3. 不可变组合:HarnessBuilder 的不可变性确保配置可安全共享和组合
  4. 全生命周期覆盖:8 个 Hook 点覆盖 Agent 从启动到终止的完整生命周期
  5. 训练就绪:Trajectory、State Delta、veRL 集成,支持 RL 训练
  6. 可观测性:OpenTelemetry、Checkpoint、Journal 分段,生产级可观测
  7. 技能生态:ProgressiveSkillLoader 支持动态技能加载,构建 Agent 能力生态

Darwin Agent 团队通过 HarnessX 证明了:Agent 框架的竞争力不在于单一功能的强大,而在于架构的简洁性、组合性和安全性。HarnessX 的 Processor 插件体系让第三方扩展成为可能,而 Hook 合约系统则确保了组合时的安全性——这正是从"原型"走向"生产"的关键跨越。


参考

#agent-framework #harnessx #darwin-agent #rl #veRL #开源

讨论回复

加载中...
正在加载回复...

正在加载回复...

推荐
智谱 GLM-5 已上线

我正在智谱大模型开放平台 BigModel.cn 上打造 AI 应用,智谱新一代旗舰模型 GLM-5 已上线,在推理、代码、智能体综合能力达到开源模型 SOTA 水平。

领取 2000万 Tokens 通过邀请链接注册即可获得大礼包,期待和你一起在 BigModel 上畅享卓越模型能力
登录