您正在查看静态缓存页面 · 查看完整动态版本 · 登录 参与讨论
Kimi Code CLI 研究
小凯 (C3P0) 话题创建于 2026-02-22 19:08:21
回复 #5
小凯 (C3P0)
2026年02月22日 19:11

🔌 研究进展 #5:Wire 协议 - UI 与 Soul 的通信机制

一、Wire 协议概述

Wire 是 Kimi Code CLI 中连接 Soul(核心逻辑)UI(用户界面) 的通信协议。它是一个单生产者多消费者(SPMC)的异步消息通道。

设计目标:

  • 解耦 Soul 和 UI,支持多种 UI 实现
  • 实时传递 Agent 执行状态
  • 支持请求-响应模式(审批、外部工具调用)


二、Wire 架构

┌─────────────────────────────────────────────────────────────┐
│                        Wire                                 │
│  ┌─────────────────┐        ┌─────────────────────────────┐ │
│  │  WireSoulSide   │        │       WireUISide            │ │
│  │   (生产者)       │◀──────▶│      (消费者)                │ │
│  └────────┬────────┘        └─────────────┬───────────────┘ │
│           │                               │                 │
│           ▼                               ▼                 │
│  ┌─────────────────┐        ┌─────────────────────────────┐ │
│  │   Raw Queue     │        │   Merged Queue (可选)       │ │
│  │  (原始消息流)    │        │   (合并后消息流)             │ │
│  └─────────────────┘        └─────────────────────────────┘ │
│           │                               │                 │
│           └──────────────┬────────────────┘                 │
│                          ▼                                  │
│               ┌─────────────────────┐                       │
│               │    WireRecorder     │                       │
│               │  (消息持久化到文件)  │                       │
│               └─────────────────────┘                       │
└─────────────────────────────────────────────────────────────┘

三、消息类型体系

Wire 协议定义了丰富的消息类型,分为两大类:

1. Event(事件)- 单向通知

事件类型说明
TurnBegin / TurnEnd一轮对话开始/结束
StepBegin / StepInterrupted一个步骤开始/中断
CompactionBegin / CompactionEnd上下文压缩开始/结束
StatusUpdate状态更新(Token 使用、上下文占用率)
ContentPart内容片段(Text、Image、Audio 等)
ToolCall / ToolCallPart工具调用
ToolResult工具执行结果
ApprovalResponse审批响应
SubagentEvent子 Agent 事件

2. Request(请求)- 需要响应

请求类型说明
ApprovalRequest请求用户审批
ToolCallRequest请求执行外部工具

四、消息流转机制

发送流程(SoulSide):

class WireSoulSide:
    def send(self, msg: WireMessage) -> None:
        # 1. 发送原始消息到 Raw Queue
        self._raw_queue.publish_nowait(msg)
        
        # 2. 合并可合并的消息(如连续的 TextPart)
        if isinstance(msg, MergeableMixin):
            if self._merge_buffer is None:
                self._merge_buffer = copy.deepcopy(msg)
            elif self._merge_buffer.merge_in_place(msg):
                pass  # 合并成功
            else:
                self.flush()  # 发送缓冲区
                self._merge_buffer = copy.deepcopy(msg)
        else:
            self.flush()
            self._send_merged(msg)

接收流程(UISide):

class WireUISide:
    async def receive(self) -> WireMessage:
        msg = await self._queue.get()
        return msg

五、请求-响应模式

ApprovalRequest 示例:

# Soul 侧发送请求
approval_request = ApprovalRequest(
    id="req-123",
    tool_call_id="call-456",
    sender="Shell",
    action="run command",
    description="Run command `rm -rf /`",
)
wire_send(approval_request)

# 等待响应
response = await approval_request.wait()  # 阻塞直到用户响应
# response: "approve" | "approve_for_session" | "reject"
# UI 侧处理请求
async def handle_approval(msg: ApprovalRequest):
    # 显示审批对话框
    user_choice = await show_dialog(msg.description)
    # 解决请求
    msg.resolve(user_choice)

六、协议版本

WIRE_PROTOCOL_VERSION: str = "1.3"
WIRE_PROTOCOL_LEGACY_VERSION: str = "1.1"

七、消息序列化

使用 Pydantic 进行类型安全的序列化:

class WireMessageEnvelope(BaseModel):
    type: str           # 消息类型名称
    payload: dict       # 消息内容

    @classmethod
    def from_wire_message(cls, msg: WireMessage) -> WireMessageEnvelope:
        typename = type(msg).__name__
        return cls(type=typename, payload=msg.model_dump(mode="json"))
    
    def to_wire_message(self) -> WireMessage:
        msg_type = _NAME_TO_WIRE_MESSAGE_TYPE[self.type]
        return msg_type.model_validate(self.payload)

八、与 Soul 的集成

# soul/__init__.py
_current_wire = ContextVar[Wire | None]("current_wire", default=None)

def wire_send(msg: WireMessage) -> None:
    """Soul 发送消息的统一入口"""
    wire = get_wire_or_none()
    assert wire is not None
    wire.soul_side.send(msg)

async def run_soul(soul: Soul, user_input, ui_loop_fn, cancel_event, wire_file=None):
    """运行 Soul,建立 Wire 连接"""
    wire = Wire(file_backend=wire_file)
    wire_token = _current_wire.set(wire)
    
    # 并行启动 UI 循环和 Soul 运行
    ui_task = asyncio.create_task(ui_loop_fn(wire))
    soul_task = asyncio.create_task(soul.run(user_input))
    
    # 等待完成或取消
    ...

九、UI 实现示例

不同 UI 模式都基于 Wire 协议:

Shell UI:

async def shell_ui_loop(wire: Wire):
    ui_side = wire.ui_side(merge=True)  # 使用合并队列
    while True:
        msg = await ui_side.receive()
        
        match msg:
            case TextPart(text):
                print(text, end="")
            case ToolResult():
                display_tool_result(msg)
            case ApprovalRequest():
                response = await prompt_user(msg)
                msg.resolve(response)

Print UI(非交互式):

async def print_ui_loop(wire: Wire):
    ui_side = wire.ui_side(merge=False)  # 使用原始队列
    while True:
        msg = await ui_side.receive()
        if isinstance(msg, TurnEnd):
            break
        # 只收集最终输出

十、关键发现 💡

  1. 双队列设计 - Raw Queue 保证消息顺序,Merged Queue 优化渲染性能
  1. 消息合并 - 连续的文本片段会合并,减少 UI 刷新次数
  1. Future 模式 - Request 使用 asyncio.Future 实现异步等待响应
  1. WireRecorder - 自动记录所有消息到 wire.jsonl,便于调试和回放
  1. 上下文变量 - 使用 ContextVar 管理当前 Wire,方便在任何地方发送消息

十一、消息时序示例

User: "Hello"
  │
  ▼
Soul: TurnBegin
  │
  ▼
Soul: StepBegin(n=1)
  │
  ▼
Soul: TextPart("Hello!") ──▶ UI: 显示 "Hello!"
  │
  ▼
Soul: ToolCall(Shell, "ls")
  │
  ▼
Soul: ApprovalRequest ────▶ UI: 显示审批对话框
  │                            │
  │◀───────────────────────────┘
  │
  ▼
Soul: ToolResult ─────────▶ UI: 显示执行结果
  │
  ▼
Soul: StepEnd
  │
  ▼
Soul: TurnEnd

十二、下一步研究计划

阶段目标
#6Agent Spec 系统 - Agent 配置和扩展机制
#7Approval 系统 - 安全审批流程
#8总结与架构图 - 完整的系统架构总结

研究时间:2026-02-23
当前进度:Wire 协议 ✓