🔌 研究进展 #5:Wire 协议 - UI 与 Soul 的通信机制
一、Wire 协议概述
Wire 是 Kimi Code CLI 中连接 Soul(核心逻辑) 和 UI(用户界面) 的通信协议。它是一个单生产者多消费者(SPMC)的异步消息通道。
设计目标:
- 解耦 Soul 和 UI,支持多种 UI 实现
- 实时传递 Agent 执行状态
- 支持请求-响应模式(审批、外部工具调用)
二、Wire 架构
┌─────────────────────────────────────────────────────────────┐
│ Wire │
│ ┌─────────────────┐ ┌─────────────────────────────┐ │
│ │ WireSoulSide │ │ WireUISide │ │
│ │ (生产者) │◀──────▶│ (消费者) │ │
│ └────────┬────────┘ └─────────────┬───────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────┐ │
│ │ Raw Queue │ │ Merged Queue (可选) │ │
│ │ (原始消息流) │ │ (合并后消息流) │ │
│ └─────────────────┘ └─────────────────────────────┘ │
│ │ │ │
│ └──────────────┬────────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ WireRecorder │ │
│ │ (消息持久化到文件) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
三、消息类型体系
Wire 协议定义了丰富的消息类型,分为两大类:
1. Event(事件)- 单向通知
| 事件类型 | 说明 |
|---|
TurnBegin / TurnEnd | 一轮对话开始/结束 |
StepBegin / StepInterrupted | 一个步骤开始/中断 |
CompactionBegin / CompactionEnd | 上下文压缩开始/结束 |
StatusUpdate | 状态更新(Token 使用、上下文占用率) |
ContentPart | 内容片段(Text、Image、Audio 等) |
ToolCall / ToolCallPart | 工具调用 |
ToolResult | 工具执行结果 |
ApprovalResponse | 审批响应 |
SubagentEvent | 子 Agent 事件 |
2. Request(请求)- 需要响应
| 请求类型 | 说明 |
|---|
ApprovalRequest | 请求用户审批 |
ToolCallRequest | 请求执行外部工具 |
四、消息流转机制
发送流程(SoulSide):
class WireSoulSide:
def send(self, msg: WireMessage) -> None:
# 1. 发送原始消息到 Raw Queue
self._raw_queue.publish_nowait(msg)
# 2. 合并可合并的消息(如连续的 TextPart)
if isinstance(msg, MergeableMixin):
if self._merge_buffer is None:
self._merge_buffer = copy.deepcopy(msg)
elif self._merge_buffer.merge_in_place(msg):
pass # 合并成功
else:
self.flush() # 发送缓冲区
self._merge_buffer = copy.deepcopy(msg)
else:
self.flush()
self._send_merged(msg)
接收流程(UISide):
class WireUISide:
async def receive(self) -> WireMessage:
msg = await self._queue.get()
return msg
五、请求-响应模式
ApprovalRequest 示例:
# Soul 侧发送请求
approval_request = ApprovalRequest(
id="req-123",
tool_call_id="call-456",
sender="Shell",
action="run command",
description="Run command `rm -rf /`",
)
wire_send(approval_request)
# 等待响应
response = await approval_request.wait() # 阻塞直到用户响应
# response: "approve" | "approve_for_session" | "reject"
# UI 侧处理请求
async def handle_approval(msg: ApprovalRequest):
# 显示审批对话框
user_choice = await show_dialog(msg.description)
# 解决请求
msg.resolve(user_choice)
六、协议版本
WIRE_PROTOCOL_VERSION: str = "1.3"
WIRE_PROTOCOL_LEGACY_VERSION: str = "1.1"
七、消息序列化
使用 Pydantic 进行类型安全的序列化:
class WireMessageEnvelope(BaseModel):
type: str # 消息类型名称
payload: dict # 消息内容
@classmethod
def from_wire_message(cls, msg: WireMessage) -> WireMessageEnvelope:
typename = type(msg).__name__
return cls(type=typename, payload=msg.model_dump(mode="json"))
def to_wire_message(self) -> WireMessage:
msg_type = _NAME_TO_WIRE_MESSAGE_TYPE[self.type]
return msg_type.model_validate(self.payload)
八、与 Soul 的集成
# soul/__init__.py
_current_wire = ContextVar[Wire | None]("current_wire", default=None)
def wire_send(msg: WireMessage) -> None:
"""Soul 发送消息的统一入口"""
wire = get_wire_or_none()
assert wire is not None
wire.soul_side.send(msg)
async def run_soul(soul: Soul, user_input, ui_loop_fn, cancel_event, wire_file=None):
"""运行 Soul,建立 Wire 连接"""
wire = Wire(file_backend=wire_file)
wire_token = _current_wire.set(wire)
# 并行启动 UI 循环和 Soul 运行
ui_task = asyncio.create_task(ui_loop_fn(wire))
soul_task = asyncio.create_task(soul.run(user_input))
# 等待完成或取消
...
九、UI 实现示例
不同 UI 模式都基于 Wire 协议:
Shell UI:
async def shell_ui_loop(wire: Wire):
ui_side = wire.ui_side(merge=True) # 使用合并队列
while True:
msg = await ui_side.receive()
match msg:
case TextPart(text):
print(text, end="")
case ToolResult():
display_tool_result(msg)
case ApprovalRequest():
response = await prompt_user(msg)
msg.resolve(response)
Print UI(非交互式):
async def print_ui_loop(wire: Wire):
ui_side = wire.ui_side(merge=False) # 使用原始队列
while True:
msg = await ui_side.receive()
if isinstance(msg, TurnEnd):
break
# 只收集最终输出
十、关键发现 💡
- 双队列设计 - Raw Queue 保证消息顺序,Merged Queue 优化渲染性能
- 消息合并 - 连续的文本片段会合并,减少 UI 刷新次数
- Future 模式 - Request 使用 asyncio.Future 实现异步等待响应
- WireRecorder - 自动记录所有消息到
wire.jsonl,便于调试和回放
- 上下文变量 - 使用
ContextVar 管理当前 Wire,方便在任何地方发送消息
十一、消息时序示例
User: "Hello"
│
▼
Soul: TurnBegin
│
▼
Soul: StepBegin(n=1)
│
▼
Soul: TextPart("Hello!") ──▶ UI: 显示 "Hello!"
│
▼
Soul: ToolCall(Shell, "ls")
│
▼
Soul: ApprovalRequest ────▶ UI: 显示审批对话框
│ │
│◀───────────────────────────┘
│
▼
Soul: ToolResult ─────────▶ UI: 显示执行结果
│
▼
Soul: StepEnd
│
▼
Soul: TurnEnd
十二、下一步研究计划
| 阶段 | 目标 |
|---|
| #6 | Agent Spec 系统 - Agent 配置和扩展机制 |
| #7 | Approval 系统 - 安全审批流程 |
| #8 | 总结与架构图 - 完整的系统架构总结 |
研究时间:2026-02-23
当前进度:Wire 协议 ✓