想象一下,如果你的大脑和嘴巴之间没有神经系统连接,会发生什么?你想说"你好",但嘴巴却接不到信号;你想思考,但思维无法传达给外界。这听起来很荒谬,对吧?但在软件世界里,这种"神经断裂"的问题却比比皆是。
在传统的命令行工具中,逻辑和界面往往是紧紧耦合在一起的。就像一个人说话时,大脑直接控制声带振动,中间没有任何缓冲。这种设计简单直接,但有一个致命缺陷:你无法让大脑和界面独立进化。
想象一下这样的场景:
Kimi Code CLI 的开发者们面临的就是这样一个问题。他们的解决方案,就是今天要讲的 Wire 协议。
Wire 协议是 Kimi Code CLI 的"神经系统"——它负责在"大脑"(Soul,核心逻辑)和"感官/肢体"(UI,各种界面)之间传递信号。
这个比喻很重要。就像人类的神经系统可以连接大脑和各种器官(眼睛、手、脚),Wire 协议也可以连接 Soul 和各种 UI(终端、IDE 插件、Web 界面)。
让我们像解剖学家一样,一层层剥开 Wire 协议的外壳。
首先,神经系统需要一种"语言"。Wire 协议定义了一套完整的消息类型,位于 wire/types.py。
这些消息可以分为三大类:
1. 事件(Event)—— 单向通知
就像你的视觉神经告诉大脑"看到一只猫",事件是 Soul 向 UI 发送的单向消息:
TurnBegin / TurnEnd:一轮对话的开始和结束StepBegin / StepInterrupted:一个思考步骤的开始和中断ContentPart:AI 生成的内容片段(文字、思考过程)ToolCall / ToolResult:工具调用及其结果StatusUpdate:状态更新(比如上下文使用量)就像大脑问手"能摸到那个杯子吗?",请求是 Soul 向 UI 发出的、需要等待回应的消息:
ApprovalRequest:"我要执行这个操作,你同意吗?"ToolCallRequest:"请帮我执行这个外部工具"QuestionRequest:"我有几个问题要问你"ApprovalResponse:用户的批准/拒绝决定QuestionResponse:用户对问题的回答有了语言,还需要"神经纤维"来传递信号。这就是 Wire 类(在 wire/__init__.py 中)。
class Wire:
"""
A spmc channel for communication between the soul and the UI during a soul run.
"""
注意注释中的 spmc——Single Producer, Multiple Consumer(单生产者多消费者)。这是一个关键设计:
WireRecorder),就能完整记录整个交互过程。
Wire 内部使用了广播队列(BroadcastQueue),确保每个消费者都能收到完整的消息流。
消息需要在不同进程、甚至不同机器之间传递,这就涉及序列化。Wire 协议使用了两种主要的传输格式:
1. 内部传输:Python 对象
在单个进程内部,消息就是 Python 对象,通过 asyncio 队列传递,效率最高。
2. 外部传输:JSON-RPC
当 Soul 和 UI 运行在不同进程(比如 VS Code 插件通过 stdio 启动 Kimi CLI)时,使用 JSON-RPC 2.0 协议封装消息。这在 wire/jsonrpc.py 和 wire/server.py 中实现。
# JSON-RPC 消息示例
{
"jsonrpc": "2.0",
"method": "event",
"params": {
"type": "TextPart",
"payload": {"text": "Hello, world!"}
}
}
理解了 Wire 协议的结构,我们来聊聊它背后的设计思想。
这是 Wire 协议最核心的目标。
Soul(KimiSoul 类)只关心一件事:如何与 LLM 交互、如何调用工具、如何管理上下文。它完全不知道消息最终会被谁消费——是终端、是 IDE、还是 Web 界面。
UI(Shell、visualize 等)只关心一件事:如何把消息呈现给用户。它完全不知道消息是怎么产生的——是 AI 生成的、是工具返回的、还是从文件回放出来的。
这种解耦带来了巨大的灵活性:
AI 生成内容是一个流式过程——字是一个个蹦出来的,不是一次性出现的。Wire 协议完全支持这种流式传输:
TextPart 消息可以源源不断地发送因为所有交互都通过 Wire 消息表达,录制就等于保存消息日志。
WireFile 类(wire/file.py)实现了消息的持久化:
# wire.jsonl 文件示例
{"type": "metadata", "protocol_version": "1.3"}
{"timestamp": 1709123456.789, "message": {"type": "TurnBegin", "payload": {...}}}
{"timestamp": 1709123456.790, "message": {"type": "TextPart", "payload": {...}}}
...
这带来了几个神奇的能力:
Wire 协议有一个精妙的优化:消息合并。
AI 生成文字时,可能会产生大量的 TextPart 消息(每个 token 一个消息)。如果 UI 每次都重新渲染,性能会很差。
WireSoulSide 使用了一个合并缓冲区(_merge_buffer),连续的内容消息会被合并成一条,减少 UI 的刷新次数。
这就像神经系统不会把每一个神经冲动都报告给大脑,而是会做一些预处理和聚合。
让我们追踪一条消息从产生到消费的完整旅程。
Step 1: Soul 产生消息
# 在 KimiSoul.run() 中
wire_send(TurnBegin(user_input="你好"))
wire_send 是一个全局函数,它从上下文变量中获取当前的 Wire 实例,然后发送消息。
Step 2: Wire 内部处理
# WireSoulSide.send()
self._raw_queue.publish_nowait(msg) # 发送原始消息
# 合并逻辑...
self._merged_queue.publish_nowait(merged_msg) # 发送合并后的消息
消息被同时发送到原始队列(用于录制)和合并队列(用于 UI 显示)。
Step 3: 录制(可选)
如果配置了 WireFile,_WireRecorder 会把消息追加到文件:
async def _record(self, msg: WireMessage) -> None:
await self._wire_file.append_message(msg)
Step 4: UI 消费消息
# 在 visualize() 中
msg = await wire.receive()
Shell UI 的 visualize 函数是一个无限循环,不断从 Wire 接收消息并渲染。
Step 5: 渲染
match msg:
case TurnBegin():
self.flush_content()
case TextPart():
self.append_content(msg)
# ...
使用 Python 3.10+ 的模式匹配,不同类型的消息触发不同的渲染逻辑。
Kimi Code CLI 不仅是一个命令行工具,还可以作为 ACP(Agent Communication Protocol)服务器运行,供 IDE 调用。
这就是 WireServer 类(wire/server.py)的职责。
initialize 消息,协商协议版本和能力prompt 消息,触发一轮对话event 方法向 IDE 推送消息流request 方法,IDE 返回响应steer 消息,实时干预cancel 消息中断当前对话回顾 Wire 协议的设计,我们可以学到几个重要的软件架构原则:
在写代码之前,先定义清晰的协议。Wire 协议的消息类型就是 Soul 和 UI 之间的"契约"。一旦协议稳定,两边可以独立开发、独立测试。
Soul → Wire → UI,数据流向清晰。请求虽然需要响应,但也是通过 Wire 发送响应消息,不破坏单向流的简洁性。
AI 应用天然是异步和流式的。Wire 协议基于 asyncio,消息可以源源不断地流动,不需要等待完整结果。
所有的交互都通过消息表达,天然可记录、可回放、可调试。这是构建可靠 AI 系统的关键。
通过 JSON-RPC 封装,Wire 协议可以跨越进程边界,支持本地 CLI、远程服务器、IDE 插件等多种部署形态。
Wire 协议的名字很形象——它就像一根导线,连接了 AI 的"大脑"和"世界"。
但这根导线不是简单的管道,它是一个精心设计的神经系统:
本文基于 Kimi Code CLI 开源代码分析撰写。如果你对 Wire 协议感兴趣,欢迎阅读源码:src/kimi_cli/wire/ 目录下有完整的实现。
#Kimi #Wire #Agent #Cli #架构
还没有人回复