> Wire 协议是 Kimi Code CLI 的核心通信机制, 连接着 Agent 的 "灵魂" (Soul) 和 "界面" (UI). 本文将从源码层面深入剖析其设计原理与实现细节.
---
## 架构概览: 为什么需要 Wire?
Kimi Code CLI 采用 **分层架构**:
```
┌─────────────────────────────────────────┐
│ UI 层 (前端) │
│ - Shell TUI / ACP Server / Print │
└──────────────┬──────────────────────────┘
│ Wire 协议
▼
┌─────────────────────────────────────────┐
│ Soul 层 (核心) │
│ - KimiSoul (Agent 主循环) │
│ - Toolset (工具执行) │
│ - Context (上下文管理) │
└─────────────────────────────────────────┘
```
**设计目标**:
1. **解耦**: Soul 不关心 UI 如何实现, UI 不关心 Soul 如何思考
2. **实时**: 支持流式输出, 用户能实时看到 Agent 的思考过程
3. **持久化**: 所有通信可记录到文件, 便于调试和回放
4. **多播**: 支持多个 UI 同时监听同一个 Soul (如 ACP + Shell)
---
## Wire 核心组件
### 1. Wire 类: 通信总线
```python
class Wire:
def __init__(self, *, file_backend: WireFile | None = None):
self._raw_queue = WireMessageQueue() # 原始消息队列
self._merged_queue = WireMessageQueue() # 合并后消息队列
self._soul_side = WireSoulSide(...) # Soul 端点
self._recorder = _WireRecorder(...) # 文件记录器 (可选)
```
**关键设计**:
- **双队列架构**: raw_queue 保留原始消息, merged_queue 合并连续消息
- **SPMC 模型**: Single-Producer (Soul) Multi-Consumer (UI)
- **可选持久化**: 通过 WireFile 将所有消息记录到 `wire.jsonl`
### 2. WireSoulSide: 发送端
```python
class WireSoulSide:
def send(self, msg: WireMessage) -> None:
# 1. 发送到原始队列
self._raw_queue.publish_nowait(msg)
# 2. 合并逻辑
if isinstance(msg, MergeableMixin):
if self._merge_buffer can merge with msg:
self._merge_buffer.merge_in_place(msg)
else:
self.flush() # 发送缓冲区的消息
self._merge_buffer = copy(msg)
else:
self.flush()
self._send_merged(msg)
```
**合并机制**: 连续的文本片段会合并成一条消息, 减少 UI 渲染次数.
### 3. WireUISide: 接收端
```python
class WireUISide:
async def receive(self) -> WireMessage:
msg = await self._queue.get()
return msg
```
UI 通过异步迭代接收消息, 支持背压 (backpressure).
---
## Wire 消息类型详解
Wire 协议定义了丰富的消息类型, 可分为 **事件 (Event)** 和 **请求 (Request)** 两大类.
### 事件类 (Event): 单向通知
#### 生命周期事件
| 消息类型 | 说明 | 触发时机 |
|---------|------|---------|
| `TurnBegin` | 新一轮对话开始 | 用户输入后 |
| `TurnEnd` | 对话结束 | Agent 完成响应 |
| `StepBegin` | 新步骤开始 | 每次 LLM 调用前 |
| `StepInterrupted` | 步骤中断 | 用户取消或出错 |
```python
class TurnBegin(BaseModel):
user_input: str | list[ContentPart]
class StepBegin(BaseModel):
n: int # 步骤序号
```
#### 上下文压缩事件
```python
class CompactionBegin(BaseModel):
# 上下文压缩开始
pass
class CompactionEnd(BaseModel):
# 上下文压缩结束
pass
```
当上下文接近上限时, Soul 会触发压缩, UI 可显示 "正在整理记忆..." 等提示.
#### 状态更新事件
```python
class StatusUpdate(BaseModel):
context_usage: float | None # 上下文使用率 (0-1)
token_usage: TokenUsage | None # Token 统计
message_id: str | None # 当前消息 ID
```
UI 可用此更新状态栏, 显示剩余上下文比例.
#### 内容事件 (来自 kosong)
```python
type ContentPart = TextPart | ImageURLPart | AudioURLPart | VideoURLPart | ThinkPart
class TextPart(BaseModel):
text: str
class ThinkPart(BaseModel):
# 思考过程 (仅某些模型支持)
text: str
```
#### 工具相关事件
```python
class ToolCall(BaseModel):
# LLM 请求调用工具
id: str
function: FunctionCall
class ToolResult(BaseModel):
# 工具执行结果
tool_call_id: str
output: str
is_error: bool
```
#### 子代理事件
```python
class SubagentEvent(BaseModel):
# 来自子代理的事件
task_tool_call_id: str # 关联的 Task 工具调用
event: Event # 子代理的事件
```
支持嵌套: 子代理的事件会通过 Wire 透传到父 UI.
### 请求类 (Request): 需要响应
#### 审批请求
```python
class ApprovalRequest(BaseModel):
id: str
tool_call_id: str
sender: str # 哪个 Agent 发起的
action: str # 动作类型 (如 "write_file")
description: str # 人类可读的描述
display: list[DisplayBlock] # 可视化展示
# 内部 Future, 用于异步等待响应
_future: asyncio.Future[ApprovalResponse.Kind] | None
async def wait(self) -> ApprovalResponse.Kind:
# 等待用户审批
return await self._get_future()
def resolve(self, response: ApprovalResponse.Kind) -> None:
# 用户做出选择
self._get_future().set_result(response)
```
**设计亮点**: 使用 asyncio.Future 实现异步等待, UI 和 Soul 解耦.
#### 工具调用请求 (ACP 模式)
```python
class ToolCallRequest(BaseModel):
# 工具调用请求 (当工具在客户端执行时)
id: str
name: str
arguments: str | None # JSON 格式
```
---
## Wire 文件格式: wire.jsonl
Wire 支持将通信记录持久化到文件, 格式为 **JSON Lines**.
### 文件结构
```jsonl
{"type": "metadata", "protocol_version": "2.0"}
{"timestamp": 1708451234.567, "message": {"type": "TurnBegin", "payload": {...}}}
{"timestamp": 1708451234.789, "message": {"type": "StepBegin", "payload": {...}}}
{"timestamp": 1708451235.012, "message": {"type": "TextPart", "payload": {...}}}
...
```
### 消息信封
所有消息都包装在 `WireMessageEnvelope` 中:
```python
class WireMessageEnvelope(BaseModel):
type: str # 消息类型名 (如 "TurnBegin")
payload: dict # 消息内容
@classmethod
def from_wire_message(cls, msg: WireMessage) -> Self:
# 根据消息类名自动序列化
return cls(type=msg.__class__.__name__, payload=msg.model_dump())
def to_wire_message(self) -> WireMessage:
# 根据 type 字段反序列化
msg_type = _NAME_TO_WIRE_MESSAGE_TYPE[self.type]
return msg_type.model_validate(self.payload)
```
**优势**: 类型安全 + 可扩展. 新增消息类型只需定义 Pydantic 模型.
---
## 消息合并机制详解
### 为什么要合并?
LLM 输出是流式的:
```
"Hello" -> "Hello world" -> "Hello world!" -> ...
```
如果每条都发送给 UI, 会造成频繁渲染. Wire 会合并连续的 `TextPart`:
```python
case MergeableMixin():
if self._merge_buffer is None:
self._merge_buffer = copy.deepcopy(msg)
elif self._merge_buffer.merge_in_place(msg):
pass # 合并成功, 继续缓冲
else:
self.flush() # 无法合并, 发送缓冲
self._merge_buffer = copy.deepcopy(msg)
```
### 合并规则
来自 `kosong.message.MergeableMixin`:
```python
class TextPart(MergeableMixin):
def merge_in_place(self, other: "TextPart") -> bool:
if not isinstance(other, TextPart):
return False
self.text += other.text # 简单拼接
return True
```
**什么情况下不合并?**
- 消息类型不同 (如 TextPart 后接 ToolCall)
- 消息包含非合并字段 (如不同的 metadata)
- 显式调用 `flush()`
---
## 实际通信流程示例
### 场景: 用户让 Agent 写一个文件
```
用户输入: "创建一个 hello.py, 输出 Hello World"
┌─────────────────────────────────────────────────────────────┐
│ 1. TurnBegin
│ {user_input: "创建一个 hello.py..."}
├─────────────────────────────────────────────────────────────┤
│ 2. StepBegin {n: 1}
├─────────────────────────────────────────────────────────────┤
│ 3. TextPart {text: "我来为你创建..."} (合并缓冲中)
├─────────────────────────────────────────────────────────────┤
│ 4. ToolCall {id: "call_1", function: {name: "write_file", ...}}
│ (非合并消息, 触发 flush, 前面的 TextPart 发送)
├─────────────────────────────────────────────────────────────┤
│ 5. ApprovalRequest
│ {action: "write_file", description: "创建文件 hello.py", ...}
│ Soul 等待 UI 响应...
├─────────────────────────────────────────────────────────────┤
│ 6. ApprovalResponse {response: "approve"}
│ 用户确认后 UI 发送
├─────────────────────────────────────────────────────────────┤
│ 7. ToolResult {tool_call_id: "call_1", output: "文件已创建"}
├─────────────────────────────────────────────────────────────┤
│ 8. StepBegin {n: 2}
├─────────────────────────────────────────────────────────────┤
│ 9. TextPart {text: "已完成!"}
├─────────────────────────────────────────────────────────────┤
│ 10. TurnEnd
└─────────────────────────────────────────────────────────────┘
```
---
## UI 实现模式
### Shell UI 模式
```python
async def shell_ui_loop(wire: Wire):
ui_side = wire.ui_side(merge=True) # 使用合并队列
async for msg in ui_side.receive_iter():
match msg:
case TextPart(text=text):
print(text, end="", flush=True)
case ApprovalRequest():
response = await ask_user(msg.description)
msg.resolve(response)
case ToolResult():
print(f"[工具结果] {msg.output}")
```
### ACP Server 模式
```python
async def acp_ui_loop(wire: Wire):
ui_side = wire.ui_side(merge=False) # 原始消息, 不合并
async for msg in ui_side.receive_iter():
# 转换为 ACP 协议格式
acp_msg = convert_to_acp(msg)
await websocket.send(acp_msg)
```
---
## 扩展 Wire 协议
### 添加自定义消息类型
```python
# 1. 定义消息模型
class MyCustomEvent(BaseModel):
custom_data: str
# 2. 添加到 Event 联合类型
type Event = (
...
| MyCustomEvent # 添加到这里
)
# 3. Soul 发送
wire_send(MyCustomEvent(custom_data="hello"))
# 4. UI 处理
match msg:
case MyCustomEvent(custom_data=data):
handle_custom(data)
```
---
## 与其他协议的对比
| 特性 | Wire | MCP | LSP |
|------|------|-----|-----|
| 设计目标 | Soul-UI 通信 | 工具调用标准 | 编辑器协议 |
| 传输方式 | 内存队列 | stdio/sse | stdio/tcp |
| 消息格式 | Pydantic 模型 | JSON-RPC | JSON-RPC |
| 持久化 | 内置支持 | 无 | 无 |
| 流式支持 | 原生 | 需封装 | 需封装 |
| 多客户端 | 广播队列 | 单连接 | 单连接 |
**Wire 的独特之处**:
- 专为 Agent 场景设计, 包含审批、工具调用、上下文压缩等语义
- 内置消息合并, 优化流式输出性能
- 支持消息持久化, 便于调试和回放
---
## 调试技巧
### 查看 wire.jsonl
```bash
# 实时查看通信记录
tail -f ~/.kimi/sessions/*/wire.jsonl | jq .
# 统计消息类型
cat wire.jsonl | jq -r '.message.type' | sort | uniq -c
```
### 启用调试日志
```bash
export KIMI_DEBUG=1
kimi
# 查看日志: ~/.kimi/logs/kimi.log
```
---
## 总结
Wire 协议是 Kimi Code CLI 的 **神经系统**:
1. **架构清晰**: Soul 专注思考, UI 专注展示, Wire 负责连接
2. **类型安全**: 全 Pydantic 模型, 编译期检查 + 运行时验证
3. **性能优化**: 消息合并减少渲染, 异步队列支持背压
4. **可观测**: 完整记录通信过程, 便于调试和审计
5. **可扩展**: 添加新消息类型只需定义模型
理解 Wire 协议, 就能理解 Kimi CLI 的核心设计哲学: **分层、解耦、可观测**.
---
## 参考
- 源码: `src/kimi_cli/wire/`
- 类型定义: `src/kimi_cli/wire/types.py`
- 文件格式: `src/kimi_cli/wire/file.py`
- 使用示例: `src/kimi_cli/ui/` (各种 UI 实现)
登录后可参与表态
讨论回复
0 条回复还没有人回复,快来发表你的看法吧!