使用 Streamable-HTTP 实现 IM 系统
完全可行的方案,特别适合需要高兼容性、无状态部署的场景
一、架构设计
1.1 核心思路
Streamable-HTTP 实现 IM 的关键:长轮询(Long Polling)+ 流式推送
┌─────────────────────────────────────────────────────────────┐
│ IM 系统架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 A 服务器 │
│ │ │ │
│ │── POST /im/send ─────────→│ 发送消息 │
│ │ {to: B, content: "hi"} │ │
│ │ │ │
│ │←── 202 Accepted ───────────│ │
│ │ │ │
│ │ │── 写入消息队列 ───→ [Redis]
│ │ │ │
│ │◄════ 长连接接收流 ════════│ │
│ │ │◄── 读取消息队列 ─── [Redis]
│ │ │ │
│ 客户端 B 服务器 │
│ │◄════ 长连接接收流 ════════│ │
│ │←── {from: A, content...} ──│ 推送消息 │
│ │ │ │
└─────────────────────────────────────────────────────────────┘
1.2 两个核心端点
| 端点 | 方法 | 用途 |
|---|---|---|
/im/stream |
GET | 建立接收流,服务器推送消息 |
/im/send |
POST | 发送消息到指定用户 |
二、完整实现代码
2.1 后端 (PHP + Redis)
<?php
// config.php
define('REDIS_HOST', '127.0.0.1');
define('REDIS_PORT', 6379);
define('STREAM_TIMEOUT', 60); // 长连接保持 60 秒
// 获取 Redis 连接
function getRedis() {
{{LATEX:0}}redis->connect(REDIS_HOST, REDIS_PORT);
return {{LATEX:1}}headers = getallheaders();
{{LATEX:2}}headers['Authorization'] ?? '';
// 实际应验证 JWT,这里简化
return verifyToken({{LATEX:3}}userId = getCurrentUserId();
if (!{{LATEX:4}}redis = getRedis();
{{LATEX:5}}_GET['last_id'] ?? 0;
{{LATEX:6}}userId]) . "\n";
flush();
while (true) {
// 检查超时
if (time() - {{LATEX:7}}startTime = time();
}
// 从 Redis 读取新消息
{{LATEX:8}}redis->zRangeByScore(
"user:{\(userId}:messages",\)lastMessageId + 1,
'+inf',
['limit' => [0, 10]]
);
if (!empty(\(messages)) {
foreach (\)messages as \(msg) {\)data = json_decode(\(msg, true);
echo json_encode(['type' => 'message', 'data' =>\)data]) . "\n";
\(lastMessageId = max(\)lastMessageId, \(data['id']);
}
flush();
}
// 休眠 100ms 避免 CPU 空转
usleep(100000);
}
?>
```
```php
userId = getCurrentUserId();
if (!\(userId) {
http_response_code(401);
exit(json_encode(['error' => 'Unauthorized']));
}\)input = json_decode(file_get_contents('php://input'), true);
\(toUserId =\)input['to'] ?? '';
\(content =\)input['content'] ?? '';
\(messageType =\)input['type'] ?? 'text';
if (!\(toUserId || !\)content) {
http_response_code(400);
exit(json_encode(['error' => 'Missing to or content']));
}
\(redis = getRedis();\)messageId = \(redis->incr('global:message_id');\)message = [
'id' => \(messageId,
'from' =>\)userId,
'to' => \(toUserId,
'content' =>\)content,
'type' => \(messageType,
'timestamp' => time()
];
// 存储到接收者的消息队列\)redis->zAdd("user:{{{LATEX:25}}messageId, json_encode({{LATEX:26}}redis->zAdd("user:{\(userId}:sent",\)messageId, json_encode(\(message));
// 设置过期时间(7 天)\)redis->expire("user:{{{LATEX:29}}redis->publish("channel:{\(toUserId}", json_encode(\)message));
// 存储到持久化数据库(异步)
// queueForPersistence(\(message);
header('Content-Type: application/json');
echo json_encode([
'success' => true,
'message_id' =>\)messageId,
'timestamp' => \(message['timestamp']
]);
?>
```
```php
userId = getCurrentUserId();
\(peerId =\)_GET['peer'] ?? '';
\(beforeId =\)_GET['before'] ?? '+inf';
\(limit = min((int)(\)_GET['limit'] ?? 50), 100);
\(redis = getRedis();
// 获取与某用户的聊天记录\)sent = \(redis->zRevRangeByScore(
"user:{\)userId}:sent",
{{LATEX:38}}limit]]
);
{{LATEX:39}}redis->zRevRangeByScore(
"user:{\(userId}:messages",\)beforeId,
0,
['limit' => [0, \(limit]]
);
// 合并并按时间排序\)messages = [];
foreach (array_merge(\(sent,\)received) as \(msg) {\)data = json_decode(\(msg, true);
if (\)data['from'] == \(peerId ||\)data['to'] == \(peerId) {\)messages[] = \(data;
}
}
usort(\)messages, fn(\(a,\)b) => \(a['id'] -\)b['id']);
header('Content-Type: application/json');
echo json_encode([
'messages' => array_slice(\(messages, 0,\)limit),
'has_more' => count(\(messages) >\)limit
]);
?>
2.2 前端 (JavaScript)
class StreamableHTTPIM {
constructor(baseUrl, token) {
this.baseUrl = baseUrl;
this.token = token;
this.messageCallbacks = [];
this.connected = false;
this.reconnectDelay = 1000;
this.lastMessageId = 0;
}
// 连接到消息流
async connect() {
try {
const response = await fetch(`\({this.baseUrl}/im/stream?last_id=\){this.lastMessageId}`, {
headers: {
'Authorization': `Bearer \({this.token}`
}
});
if (!response.ok) {
throw new Error(`HTTP\){response.status}`);
}
this.connected = true;
this.reconnectDelay = 1000; // 重置重连延迟
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (this.connected) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留未完成行
for (const line of lines) {
if (line.trim()) {
this.handleMessage(line);
}
}
}
} catch (error) {
console.error('连接错误:', error);
} finally {
this.connected = false;
this.scheduleReconnect();
}
}
// 处理接收到的消息
handleMessage(line) {
try {
const msg = JSON.parse(line);
switch (msg.type) {
case 'connected':
console.log('已连接,用户ID:', msg.user_id);
break;
case 'message':
this.lastMessageId = Math.max(this.lastMessageId, msg.data.id);
this.messageCallbacks.forEach(cb => cb(msg.data));
break;
case 'ping':
// 心跳,保持连接
break;
}
} catch (e) {
console.error('解析消息失败:', e);
}
}
// 发送消息
async send(to, content, type = 'text') {
const response = await fetch(`\({this.baseUrl}/im/send`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer\){this.token}`
},
body: JSON.stringify({ to, content, type })
});
if (!response.ok) {
throw new Error(`发送失败: \({response.status}`);
}
return await response.json();
}
// 获取历史消息
async getHistory(peerId, options = {}) {
const params = new URLSearchParams({ peer: peerId, ...options });
const response = await fetch(`\){this.baseUrl}/im/history?\({params}`, {
headers: { 'Authorization': `Bearer\){this.token}` }
});
return await response.json();
}
// 监听消息
onMessage(callback) {
this.messageCallbacks.push(callback);
}
// 断开连接
disconnect() {
this.connected = false;
}
// 重连
scheduleReconnect() {
console.log(`\({this.reconnectDelay}ms 后重连...`);
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
}
}
// 使用示例
const im = new StreamableHTTPIM('https://api.example.com', 'your-jwt-token');
// 连接并监听消息
im.onMessage((msg) => {
console.log(`收到来自\){msg.from} 的消息:`, msg.content);
// 更新 UI
});
im.connect();
// 发送消息
async function sendMessage() {
await im.send('user-123', '你好!这是 Streamable-HTTP IM');
}
// 获取历史消息
async function loadHistory() {
const history = await im.getHistory('user-123', { limit: 20 });
console.log('历史消息:', history.messages);
}
三、群聊实现
<?php
// send_group.php - 发送群消息
require 'config.php';
{{LATEX:58}}input = json_decode(file_get_contents('php://input'), true);
{{LATEX:59}}input['group'] ?? '';
{{LATEX:60}}input['content'] ?? '';
{{LATEX:61}}messageId = {{LATEX:62}}message = [
'id' => {{LATEX:63}}userId,
'group' => {{LATEX:64}}content,
'timestamp' => time()
];
// 获取群成员
{{LATEX:65}}redis->sMembers("group:{\(groupId}:members");
foreach (\)members as \(memberId) {
if (\)memberId != \(userId) {\)redis->zAdd("user:{{{LATEX:69}}messageId, json_encode({{LATEX:70}}redis->publish("channel:{\(memberId}", json_encode(\)message));
}
}
echo json_encode(['success' => true, 'message_id' => \(messageId]);
?>
```
---
## 四、多服务器扩展
```
┌─────────────────────────────────────────────────────────────┐
│ 多服务器架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 A 服务器 1 服务器 2 客户端 B│
│ │ │ │ │ │
│ │── 连接 ───────→│ │ │ │
│ │ │ │←── 连接 ────│ │
│ │ │ │ │ │
│ │ │◄── Redis Pub/Sub ─→│ │ │
│ │ │ (消息广播) │ │ │
│ │ │ │ │ │
│ │←── 推送 ───────│ │── 推送 ────→│ │
│ │ │ │ │ │
│ │
│ 负载均衡器 (Round Robin) │
│ 无需粘性会话! │
│ │
└─────────────────────────────────────────────────────────────┘
```
使用 Redis Pub/Sub 实现服务器间消息广播,**支持无状态水平扩展**。
---
## 五、与 WebSocket IM 对比
| 特性 | Streamable-HTTP IM | WebSocket IM |
|------|-------------------|--------------|
| **部署难度** | 极低(标准 PHP 主机) | 中(需常驻服务) |
| **兼容性** | 极佳(穿透任何防火墙) | 可能受阻 |
| **消息延迟** | ~100-300ms | ~10-50ms |
| **并发能力** | 高(HTTP/2 多路复用) | 极高(单连接) |
| **服务器资源** | 中(定期重连) | 高(保持长连接) |
| **断线恢复** | 需手动实现(示例已包含) | 需手动实现 |
| **浏览器支持** | 所有现代浏览器 | IE10+ |
| **移动端支持** | 极佳(兼容性好) | 需处理心跳 |
| **历史消息** | 天然支持(HTTP 请求) | 需额外实现 |
| **CDN 支持** | 完整 | 有限 |
---
## 六、优化建议
### 6.1 减少延迟
```php
// 使用 Redis 发布订阅代替轮询\)redis->subscribe(["channel:{{{LATEX:73}}redis, {{LATEX:74}}message) {
echo $message . "\n";
flush();
});
6.2 处理大并发
# Nginx 配置优化
location /im/stream {
proxy_pass http://backend;
proxy_buffering off; # 禁用缓冲
proxy_read_timeout 3600s; # 长连接超时
proxy_http_version 1.1; # HTTP/1.1 保持连接
}
6.3 消息确认机制
// 添加消息确认
async sendWithAck(to, content) {
const tempId = generateId();
const msg = { tempId, to, content, status: 'sending' };
// 乐观更新 UI
this.renderMessage(msg);
try {
const result = await this.send(to, content);
// 更新为已发送
this.updateMessageStatus(tempId, 'sent', result.message_id);
} catch (e) {
this.updateMessageStatus(tempId, 'failed');
}
}
七、一句话总结
Streamable-HTTP 做 IM = 80% 的实时性 + 200% 的兼容性 + 零部署成本
对于不需要游戏级延迟的 IM 场景(企业微信、钉钉、普通聊天),Streamable-HTTP 完全够用,且更容易部署和维护。
实现时间:2026-03-07
标签: #IM #StreamableHTTP #实时通信 #PHP #Redis #Web开发 #小凯
登录后可参与表态
推荐
推荐
智谱 GLM-5 已上线
我正在智谱大模型开放平台 BigModel.cn 上打造 AI 应用,智谱新一代旗舰模型 GLM-5 已上线,在推理、代码、智能体综合能力达到开源模型 SOTA 水平。
领取 2000万 Tokens
通过邀请链接注册即可获得大礼包,期待和你一起在 BigModel 上畅享卓越模型能力