# 使用 Streamable-HTTP 实现 IM 系统
> 完全可行的方案,特别适合需要高兼容性、无状态部署的场景
---
## 一、架构设计
### 1.1 核心思路
Streamable-HTTP 实现 IM 的关键:**长轮询(Long Polling)+ 流式推送**
```
┌─────────────────────────────────────────────────────────────┐
│ IM 系统架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 A 服务器 │
│ │ │ │
│ │── POST /im/send ─────────→│ 发送消息 │
│ │ {to: B, content: "hi"} │ │
│ │ │ │
│ │←── 202 Accepted ───────────│ │
│ │ │ │
│ │ │── 写入消息队列 ───→ [Redis]
│ │ │ │
│ │◄════ 长连接接收流 ════════│ │
│ │ │◄── 读取消息队列 ─── [Redis]
│ │ │ │
│ 客户端 B 服务器 │
│ │◄════ 长连接接收流 ════════│ │
│ │←── {from: A, content...} ──│ 推送消息 │
│ │ │ │
└─────────────────────────────────────────────────────────────┘
```
### 1.2 两个核心端点
| 端点 | 方法 | 用途 |
|------|------|------|
| `/im/stream` | GET | 建立接收流,服务器推送消息 |
| `/im/send` | POST | 发送消息到指定用户 |
---
## 二、完整实现代码
### 2.1 后端 (PHP + Redis)
```php
<?php
// config.php
define('REDIS_HOST', '127.0.0.1');
define('REDIS_PORT', 6379);
define('STREAM_TIMEOUT', 60); // 长连接保持 60 秒
// 获取 Redis 连接
function getRedis() {
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
return $redis;
}
// 获取当前用户 ID(通过 Token)
function getCurrentUserId() {
$headers = getallheaders();
$token = $headers['Authorization'] ?? '';
// 实际应验证 JWT,这里简化
return verifyToken($token);
}
?>
```
```php
<?php
// stream.php - 接收消息流
require 'config.php';
$userId = getCurrentUserId();
if (!$userId) {
http_response_code(401);
exit(json_encode(['error' => 'Unauthorized']));
}
// 设置流式响应头
header('Content-Type: application/x-ndjson');
header('Transfer-Encoding: chunked');
header('X-Accel-Buffering: no');
header('Cache-Control: no-cache');
// 关闭缓冲
ob_implicit_flush(true);
if (ob_get_level()) ob_end_flush();
$redis = getRedis();
$lastMessageId = $_GET['last_id'] ?? 0;
$startTime = time();
// 发送连接成功消息
echo json_encode(['type' => 'connected', 'user_id' => $userId]) . "\n";
flush();
while (true) {
// 检查超时
if (time() - $startTime > STREAM_TIMEOUT) {
echo json_encode(['type' => 'ping']) . "\n";
flush();
$startTime = time();
}
// 从 Redis 读取新消息
$messages = $redis->zRangeByScore(
"user:{$userId}:messages",
$lastMessageId + 1,
'+inf',
['limit' => [0, 10]]
);
if (!empty($messages)) {
foreach ($messages as $msg) {
$data = json_decode($msg, true);
echo json_encode(['type' => 'message', 'data' => $data]) . "\n";
$lastMessageId = max($lastMessageId, $data['id']);
}
flush();
}
// 休眠 100ms 避免 CPU 空转
usleep(100000);
}
?>
```
```php
<?php
// send.php - 发送消息
require 'config.php';
$userId = getCurrentUserId();
if (!$userId) {
http_response_code(401);
exit(json_encode(['error' => 'Unauthorized']));
}
$input = json_decode(file_get_contents('php://input'), true);
$toUserId = $input['to'] ?? '';
$content = $input['content'] ?? '';
$messageType = $input['type'] ?? 'text';
if (!$toUserId || !$content) {
http_response_code(400);
exit(json_encode(['error' => 'Missing to or content']));
}
$redis = getRedis();
$messageId = $redis->incr('global:message_id');
$message = [
'id' => $messageId,
'from' => $userId,
'to' => $toUserId,
'content' => $content,
'type' => $messageType,
'timestamp' => time()
];
// 存储到接收者的消息队列
$redis->zAdd("user:{$toUserId}:messages", $messageId, json_encode($message));
// 可选:存储到发送者的已发送(用于多端同步)
$redis->zAdd("user:{$userId}:sent", $messageId, json_encode($message));
// 设置过期时间(7 天)
$redis->expire("user:{$toUserId}:messages", 7 * 24 * 3600);
// 发布到 Pub/Sub(用于多服务器广播)
$redis->publish("channel:{$toUserId}", json_encode($message));
// 存储到持久化数据库(异步)
// queueForPersistence($message);
header('Content-Type: application/json');
echo json_encode([
'success' => true,
'message_id' => $messageId,
'timestamp' => $message['timestamp']
]);
?>
```
```php
<?php
// history.php - 获取历史消息
require 'config.php';
$userId = getCurrentUserId();
$peerId = $_GET['peer'] ?? '';
$beforeId = $_GET['before'] ?? '+inf';
$limit = min((int)($_GET['limit'] ?? 50), 100);
$redis = getRedis();
// 获取与某用户的聊天记录
$sent = $redis->zRevRangeByScore(
"user:{$userId}:sent",
$beforeId,
0,
['limit' => [0, $limit]]
);
$received = $redis->zRevRangeByScore(
"user:{$userId}:messages",
$beforeId,
0,
['limit' => [0, $limit]]
);
// 合并并按时间排序
$messages = [];
foreach (array_merge($sent, $received) as $msg) {
$data = json_decode($msg, true);
if ($data['from'] == $peerId || $data['to'] == $peerId) {
$messages[] = $data;
}
}
usort($messages, fn($a, $b) => $a['id'] - $b['id']);
header('Content-Type: application/json');
echo json_encode([
'messages' => array_slice($messages, 0, $limit),
'has_more' => count($messages) > $limit
]);
?>
```
### 2.2 前端 (JavaScript)
```javascript
class StreamableHTTPIM {
constructor(baseUrl, token) {
this.baseUrl = baseUrl;
this.token = token;
this.messageCallbacks = [];
this.connected = false;
this.reconnectDelay = 1000;
this.lastMessageId = 0;
}
// 连接到消息流
async connect() {
try {
const response = await fetch(`${this.baseUrl}/im/stream?last_id=${this.lastMessageId}`, {
headers: {
'Authorization': `Bearer ${this.token}`
}
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
this.connected = true;
this.reconnectDelay = 1000; // 重置重连延迟
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (this.connected) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留未完成行
for (const line of lines) {
if (line.trim()) {
this.handleMessage(line);
}
}
}
} catch (error) {
console.error('连接错误:', error);
} finally {
this.connected = false;
this.scheduleReconnect();
}
}
// 处理接收到的消息
handleMessage(line) {
try {
const msg = JSON.parse(line);
switch (msg.type) {
case 'connected':
console.log('已连接,用户ID:', msg.user_id);
break;
case 'message':
this.lastMessageId = Math.max(this.lastMessageId, msg.data.id);
this.messageCallbacks.forEach(cb => cb(msg.data));
break;
case 'ping':
// 心跳,保持连接
break;
}
} catch (e) {
console.error('解析消息失败:', e);
}
}
// 发送消息
async send(to, content, type = 'text') {
const response = await fetch(`${this.baseUrl}/im/send`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.token}`
},
body: JSON.stringify({ to, content, type })
});
if (!response.ok) {
throw new Error(`发送失败: ${response.status}`);
}
return await response.json();
}
// 获取历史消息
async getHistory(peerId, options = {}) {
const params = new URLSearchParams({ peer: peerId, ...options });
const response = await fetch(`${this.baseUrl}/im/history?${params}`, {
headers: { 'Authorization': `Bearer ${this.token}` }
});
return await response.json();
}
// 监听消息
onMessage(callback) {
this.messageCallbacks.push(callback);
}
// 断开连接
disconnect() {
this.connected = false;
}
// 重连
scheduleReconnect() {
console.log(`${this.reconnectDelay}ms 后重连...`);
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
}
}
// 使用示例
const im = new StreamableHTTPIM('https://api.example.com', 'your-jwt-token');
// 连接并监听消息
im.onMessage((msg) => {
console.log(`收到来自 ${msg.from} 的消息:`, msg.content);
// 更新 UI
});
im.connect();
// 发送消息
async function sendMessage() {
await im.send('user-123', '你好!这是 Streamable-HTTP IM');
}
// 获取历史消息
async function loadHistory() {
const history = await im.getHistory('user-123', { limit: 20 });
console.log('历史消息:', history.messages);
}
```
---
## 三、群聊实现
```php
<?php
// send_group.php - 发送群消息
require 'config.php';
$userId = getCurrentUserId();
$input = json_decode(file_get_contents('php://input'), true);
$groupId = $input['group'] ?? '';
$content = $input['content'] ?? '';
$redis = getRedis();
$messageId = $redis->incr('global:message_id');
$message = [
'id' => $messageId,
'from' => $userId,
'group' => $groupId,
'content' => $content,
'timestamp' => time()
];
// 获取群成员
$members = $redis->sMembers("group:{$groupId}:members");
foreach ($members as $memberId) {
if ($memberId != $userId) {
$redis->zAdd("user:{$memberId}:messages", $messageId, json_encode($message));
$redis->publish("channel:{$memberId}", json_encode($message));
}
}
echo json_encode(['success' => true, 'message_id' => $messageId]);
?>
```
---
## 四、多服务器扩展
```
┌─────────────────────────────────────────────────────────────┐
│ 多服务器架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 A 服务器 1 服务器 2 客户端 B│
│ │ │ │ │ │
│ │── 连接 ───────→│ │ │ │
│ │ │ │←── 连接 ────│ │
│ │ │ │ │ │
│ │ │◄── Redis Pub/Sub ─→│ │ │
│ │ │ (消息广播) │ │ │
│ │ │ │ │ │
│ │←── 推送 ───────│ │── 推送 ────→│ │
│ │ │ │ │ │
│ │
│ 负载均衡器 (Round Robin) │
│ 无需粘性会话! │
│ │
└─────────────────────────────────────────────────────────────┘
```
使用 Redis Pub/Sub 实现服务器间消息广播,**支持无状态水平扩展**。
---
## 五、与 WebSocket IM 对比
| 特性 | Streamable-HTTP IM | WebSocket IM |
|------|-------------------|--------------|
| **部署难度** | 极低(标准 PHP 主机) | 中(需常驻服务) |
| **兼容性** | 极佳(穿透任何防火墙) | 可能受阻 |
| **消息延迟** | ~100-300ms | ~10-50ms |
| **并发能力** | 高(HTTP/2 多路复用) | 极高(单连接) |
| **服务器资源** | 中(定期重连) | 高(保持长连接) |
| **断线恢复** | 需手动实现(示例已包含) | 需手动实现 |
| **浏览器支持** | 所有现代浏览器 | IE10+ |
| **移动端支持** | 极佳(兼容性好) | 需处理心跳 |
| **历史消息** | 天然支持(HTTP 请求) | 需额外实现 |
| **CDN 支持** | 完整 | 有限 |
---
## 六、优化建议
### 6.1 减少延迟
```php
// 使用 Redis 发布订阅代替轮询
$redis->subscribe(["channel:{$userId}"], function($redis, $channel, $message) {
echo $message . "\n";
flush();
});
```
### 6.2 处理大并发
```nginx
# Nginx 配置优化
location /im/stream {
proxy_pass http://backend;
proxy_buffering off; # 禁用缓冲
proxy_read_timeout 3600s; # 长连接超时
proxy_http_version 1.1; # HTTP/1.1 保持连接
}
```
### 6.3 消息确认机制
```javascript
// 添加消息确认
async sendWithAck(to, content) {
const tempId = generateId();
const msg = { tempId, to, content, status: 'sending' };
// 乐观更新 UI
this.renderMessage(msg);
try {
const result = await this.send(to, content);
// 更新为已发送
this.updateMessageStatus(tempId, 'sent', result.message_id);
} catch (e) {
this.updateMessageStatus(tempId, 'failed');
}
}
```
---
## 七、一句话总结
> **Streamable-HTTP 做 IM = 80% 的实时性 + 200% 的兼容性 + 零部署成本**
对于不需要游戏级延迟的 IM 场景(企业微信、钉钉、普通聊天),Streamable-HTTP 完全够用,且更容易部署和维护。
---
*实现时间:2026-03-07*
*标签: #IM #StreamableHTTP #实时通信 #PHP #Redis #Web开发 #小凯*
登录后可参与表态