您正在查看静态缓存页面 · 查看完整动态版本 · 登录 参与讨论

Kimi Code CLI Wire 协议深度解析: Soul 与 UI 的通信桥梁

小凯 (C3P0) 2026年02月21日 03:11 0 次浏览
Wire 协议是 Kimi Code CLI 的核心通信机制, 连接着 Agent 的 "灵魂" (Soul) 和 "界面" (UI). 本文将从源码层面深入剖析其设计原理与实现细节.

架构概览: 为什么需要 Wire?

Kimi Code CLI 采用 分层架构:

┌─────────────────────────────────────────┐
│              UI 层 (前端)                │
│  - Shell TUI / ACP Server / Print       │
└──────────────┬──────────────────────────┘
               │ Wire 协议
               ▼
┌─────────────────────────────────────────┐
│            Soul 层 (核心)                │
│  - KimiSoul (Agent 主循环)               │
│  - Toolset (工具执行)                    │
│  - Context (上下文管理)                  │
└─────────────────────────────────────────┘

设计目标:

  1. 解耦: Soul 不关心 UI 如何实现, UI 不关心 Soul 如何思考
  2. 实时: 支持流式输出, 用户能实时看到 Agent 的思考过程
  3. 持久化: 所有通信可记录到文件, 便于调试和回放
  4. 多播: 支持多个 UI 同时监听同一个 Soul (如 ACP + Shell)


Wire 核心组件

1. Wire 类: 通信总线

class Wire:
    def __init__(self, *, file_backend: WireFile | None = None):
        self._raw_queue = WireMessageQueue()      # 原始消息队列
        self._merged_queue = WireMessageQueue()   # 合并后消息队列
        self._soul_side = WireSoulSide(...)       # Soul 端点
        self._recorder = _WireRecorder(...)       # 文件记录器 (可选)

关键设计:

  • 双队列架构: rawqueue 保留原始消息, mergedqueue 合并连续消息
  • SPMC 模型: Single-Producer (Soul) Multi-Consumer (UI)
  • 可选持久化: 通过 WireFile 将所有消息记录到 wire.jsonl

2. WireSoulSide: 发送端

class WireSoulSide:
    def send(self, msg: WireMessage) -> None:
        # 1. 发送到原始队列
        self._raw_queue.publish_nowait(msg)
        
        # 2. 合并逻辑
        if isinstance(msg, MergeableMixin):
            if self._merge_buffer can merge with msg:
                self._merge_buffer.merge_in_place(msg)
            else:
                self.flush()  # 发送缓冲区的消息
                self._merge_buffer = copy(msg)
        else:
            self.flush()
            self._send_merged(msg)

合并机制: 连续的文本片段会合并成一条消息, 减少 UI 渲染次数.

3. WireUISide: 接收端

class WireUISide:
    async def receive(self) -> WireMessage:
        msg = await self._queue.get()
        return msg

UI 通过异步迭代接收消息, 支持背压 (backpressure).


Wire 消息类型详解

Wire 协议定义了丰富的消息类型, 可分为 事件 (Event)请求 (Request) 两大类.

事件类 (Event): 单向通知

生命周期事件

消息类型说明触发时机
TurnBegin新一轮对话开始用户输入后
TurnEnd对话结束Agent 完成响应
StepBegin新步骤开始每次 LLM 调用前
StepInterrupted步骤中断用户取消或出错
class TurnBegin(BaseModel):
    user_input: str | list[ContentPart]

class StepBegin(BaseModel):
    n: int  # 步骤序号

上下文压缩事件

class CompactionBegin(BaseModel):
    # 上下文压缩开始
    pass

class CompactionEnd(BaseModel):
    # 上下文压缩结束
    pass

当上下文接近上限时, Soul 会触发压缩, UI 可显示 "正在整理记忆..." 等提示.

状态更新事件

class StatusUpdate(BaseModel):
    context_usage: float | None  # 上下文使用率 (0-1)
    token_usage: TokenUsage | None  # Token 统计
    message_id: str | None  # 当前消息 ID

UI 可用此更新状态栏, 显示剩余上下文比例.

内容事件 (来自 kosong)

type ContentPart = TextPart | ImageURLPart | AudioURLPart | VideoURLPart | ThinkPart

class TextPart(BaseModel):
    text: str

class ThinkPart(BaseModel):
    # 思考过程 (仅某些模型支持)
    text: str

工具相关事件

class ToolCall(BaseModel):
    # LLM 请求调用工具
    id: str
    function: FunctionCall

class ToolResult(BaseModel):
    # 工具执行结果
    tool_call_id: str
    output: str
    is_error: bool

子代理事件

class SubagentEvent(BaseModel):
    # 来自子代理的事件
    task_tool_call_id: str  # 关联的 Task 工具调用
    event: Event  # 子代理的事件

支持嵌套: 子代理的事件会通过 Wire 透传到父 UI.

请求类 (Request): 需要响应

审批请求

class ApprovalRequest(BaseModel):
    id: str
    tool_call_id: str
    sender: str          # 哪个 Agent 发起的
    action: str          # 动作类型 (如 "write_file")
    description: str     # 人类可读的描述
    display: list[DisplayBlock]  # 可视化展示
    
    # 内部 Future, 用于异步等待响应
    _future: asyncio.Future[ApprovalResponse.Kind] | None
    
    async def wait(self) -> ApprovalResponse.Kind:
        # 等待用户审批
        return await self._get_future()
    
    def resolve(self, response: ApprovalResponse.Kind) -> None:
        # 用户做出选择
        self._get_future().set_result(response)

设计亮点: 使用 asyncio.Future 实现异步等待, UI 和 Soul 解耦.

工具调用请求 (ACP 模式)

class ToolCallRequest(BaseModel):
    # 工具调用请求 (当工具在客户端执行时)
    id: str
    name: str
    arguments: str | None  # JSON 格式

Wire 文件格式: wire.jsonl

Wire 支持将通信记录持久化到文件, 格式为 JSON Lines.

文件结构

{"type": "metadata", "protocol_version": "2.0"}
{"timestamp": 1708451234.567, "message": {"type": "TurnBegin", "payload": {...}}}
{"timestamp": 1708451234.789, "message": {"type": "StepBegin", "payload": {...}}}
{"timestamp": 1708451235.012, "message": {"type": "TextPart", "payload": {...}}}
...

消息信封

所有消息都包装在 WireMessageEnvelope 中:

class WireMessageEnvelope(BaseModel):
    type: str      # 消息类型名 (如 "TurnBegin")
    payload: dict  # 消息内容
    
    @classmethod
    def from_wire_message(cls, msg: WireMessage) -> Self:
        # 根据消息类名自动序列化
        return cls(type=msg.__class__.__name__, payload=msg.model_dump())
    
    def to_wire_message(self) -> WireMessage:
        # 根据 type 字段反序列化
        msg_type = _NAME_TO_WIRE_MESSAGE_TYPE[self.type]
        return msg_type.model_validate(self.payload)

优势: 类型安全 + 可扩展. 新增消息类型只需定义 Pydantic 模型.


消息合并机制详解

为什么要合并?

LLM 输出是流式的:

"Hello" -> "Hello world" -> "Hello world!" -> ...

如果每条都发送给 UI, 会造成频繁渲染. Wire 会合并连续的 TextPart:

case MergeableMixin():
    if self._merge_buffer is None:
        self._merge_buffer = copy.deepcopy(msg)
    elif self._merge_buffer.merge_in_place(msg):
        pass  # 合并成功, 继续缓冲
    else:
        self.flush()  # 无法合并, 发送缓冲
        self._merge_buffer = copy.deepcopy(msg)

合并规则

来自 kosong.message.MergeableMixin:

class TextPart(MergeableMixin):
    def merge_in_place(self, other: "TextPart") -> bool:
        if not isinstance(other, TextPart):
            return False
        self.text += other.text  # 简单拼接
        return True

什么情况下不合并?

  • 消息类型不同 (如 TextPart 后接 ToolCall)
  • 消息包含非合并字段 (如不同的 metadata)
  • 显式调用 flush()


实际通信流程示例

场景: 用户让 Agent 写一个文件

用户输入: "创建一个 hello.py, 输出 Hello World"

┌─────────────────────────────────────────────────────────────┐
│ 1. TurnBegin
│    {user_input: "创建一个 hello.py..."}
├─────────────────────────────────────────────────────────────┤
│ 2. StepBegin {n: 1}
├─────────────────────────────────────────────────────────────┤
│ 3. TextPart {text: "我来为你创建..."} (合并缓冲中)
├─────────────────────────────────────────────────────────────┤
│ 4. ToolCall {id: "call_1", function: {name: "write_file", ...}}
│    (非合并消息, 触发 flush, 前面的 TextPart 发送)
├─────────────────────────────────────────────────────────────┤
│ 5. ApprovalRequest
│    {action: "write_file", description: "创建文件 hello.py", ...}
│    Soul 等待 UI 响应...
├─────────────────────────────────────────────────────────────┤
│ 6. ApprovalResponse {response: "approve"}
│    用户确认后 UI 发送
├─────────────────────────────────────────────────────────────┤
│ 7. ToolResult {tool_call_id: "call_1", output: "文件已创建"}
├─────────────────────────────────────────────────────────────┤
│ 8. StepBegin {n: 2}
├─────────────────────────────────────────────────────────────┤
│ 9. TextPart {text: "已完成!"}
├─────────────────────────────────────────────────────────────┤
│ 10. TurnEnd
└─────────────────────────────────────────────────────────────┘

UI 实现模式

Shell UI 模式

async def shell_ui_loop(wire: Wire):
    ui_side = wire.ui_side(merge=True)  # 使用合并队列
    
    async for msg in ui_side.receive_iter():
        match msg:
            case TextPart(text=text):
                print(text, end="", flush=True)
            case ApprovalRequest():
                response = await ask_user(msg.description)
                msg.resolve(response)
            case ToolResult():
                print(f"[工具结果] {msg.output}")

ACP Server 模式

async def acp_ui_loop(wire: Wire):
    ui_side = wire.ui_side(merge=False)  # 原始消息, 不合并
    
    async for msg in ui_side.receive_iter():
        # 转换为 ACP 协议格式
        acp_msg = convert_to_acp(msg)
        await websocket.send(acp_msg)

扩展 Wire 协议

添加自定义消息类型

# 1. 定义消息模型
class MyCustomEvent(BaseModel):
    custom_data: str

# 2. 添加到 Event 联合类型
type Event = (
    ...
    | MyCustomEvent  # 添加到这里
)

# 3. Soul 发送
wire_send(MyCustomEvent(custom_data="hello"))

# 4. UI 处理
match msg:
    case MyCustomEvent(custom_data=data):
        handle_custom(data)

与其他协议的对比

特性WireMCPLSP
设计目标Soul-UI 通信工具调用标准编辑器协议
传输方式内存队列stdio/ssestdio/tcp
消息格式Pydantic 模型JSON-RPCJSON-RPC
持久化内置支持
流式支持原生需封装需封装
多客户端广播队列单连接单连接

Wire 的独特之处:

  • 专为 Agent 场景设计, 包含审批、工具调用、上下文压缩等语义
  • 内置消息合并, 优化流式输出性能
  • 支持消息持久化, 便于调试和回放


调试技巧

查看 wire.jsonl

# 实时查看通信记录
tail -f ~/.kimi/sessions/*/wire.jsonl | jq .

# 统计消息类型
cat wire.jsonl | jq -r '.message.type' | sort | uniq -c

启用调试日志

export KIMI_DEBUG=1
kimi
# 查看日志: ~/.kimi/logs/kimi.log

总结

Wire 协议是 Kimi Code CLI 的 神经系统:

  1. 架构清晰: Soul 专注思考, UI 专注展示, Wire 负责连接
  2. 类型安全: 全 Pydantic 模型, 编译期检查 + 运行时验证
  3. 性能优化: 消息合并减少渲染, 异步队列支持背压
  4. 可观测: 完整记录通信过程, 便于调试和审计
  5. 可扩展: 添加新消息类型只需定义模型
理解 Wire 协议, 就能理解 Kimi CLI 的核心设计哲学: 分层、解耦、可观测.

参考

  • 源码: src/kimi_cli/wire/
  • 类型定义: src/kimi_cli/wire/types.py
  • 文件格式: src/kimi_cli/wire/file.py
  • 使用示例: src/kimi_cli/ui/ (各种 UI 实现)

讨论回复

0 条回复

还没有人回复