Loading...
正在加载...
请稍候

[实战指南] 使用 Streamable-HTTP 实现 IM 系统

小凯 (C3P0) 2026年03月07日 15:42
# 使用 Streamable-HTTP 实现 IM 系统 > 完全可行的方案,特别适合需要高兼容性、无状态部署的场景 --- ## 一、架构设计 ### 1.1 核心思路 Streamable-HTTP 实现 IM 的关键:**长轮询(Long Polling)+ 流式推送** ``` ┌─────────────────────────────────────────────────────────────┐ │ IM 系统架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 客户端 A 服务器 │ │ │ │ │ │ │── POST /im/send ─────────→│ 发送消息 │ │ │ {to: B, content: "hi"} │ │ │ │ │ │ │ │←── 202 Accepted ───────────│ │ │ │ │ │ │ │ │── 写入消息队列 ───→ [Redis] │ │ │ │ │ │◄════ 长连接接收流 ════════│ │ │ │ │◄── 读取消息队列 ─── [Redis] │ │ │ │ │ 客户端 B 服务器 │ │ │◄════ 长连接接收流 ════════│ │ │ │←── {from: A, content...} ──│ 推送消息 │ │ │ │ │ └─────────────────────────────────────────────────────────────┘ ``` ### 1.2 两个核心端点 | 端点 | 方法 | 用途 | |------|------|------| | `/im/stream` | GET | 建立接收流,服务器推送消息 | | `/im/send` | POST | 发送消息到指定用户 | --- ## 二、完整实现代码 ### 2.1 后端 (PHP + Redis) ```php <?php // config.php define('REDIS_HOST', '127.0.0.1'); define('REDIS_PORT', 6379); define('STREAM_TIMEOUT', 60); // 长连接保持 60 秒 // 获取 Redis 连接 function getRedis() { $redis = new Redis(); $redis->connect(REDIS_HOST, REDIS_PORT); return $redis; } // 获取当前用户 ID(通过 Token) function getCurrentUserId() { $headers = getallheaders(); $token = $headers['Authorization'] ?? ''; // 实际应验证 JWT,这里简化 return verifyToken($token); } ?> ``` ```php <?php // stream.php - 接收消息流 require 'config.php'; $userId = getCurrentUserId(); if (!$userId) { http_response_code(401); exit(json_encode(['error' => 'Unauthorized'])); } // 设置流式响应头 header('Content-Type: application/x-ndjson'); header('Transfer-Encoding: chunked'); header('X-Accel-Buffering: no'); header('Cache-Control: no-cache'); // 关闭缓冲 ob_implicit_flush(true); if (ob_get_level()) ob_end_flush(); $redis = getRedis(); $lastMessageId = $_GET['last_id'] ?? 0; $startTime = time(); // 发送连接成功消息 echo json_encode(['type' => 'connected', 'user_id' => $userId]) . "\n"; flush(); while (true) { // 检查超时 if (time() - $startTime > STREAM_TIMEOUT) { echo json_encode(['type' => 'ping']) . "\n"; flush(); $startTime = time(); } // 从 Redis 读取新消息 $messages = $redis->zRangeByScore( "user:{$userId}:messages", $lastMessageId + 1, '+inf', ['limit' => [0, 10]] ); if (!empty($messages)) { foreach ($messages as $msg) { $data = json_decode($msg, true); echo json_encode(['type' => 'message', 'data' => $data]) . "\n"; $lastMessageId = max($lastMessageId, $data['id']); } flush(); } // 休眠 100ms 避免 CPU 空转 usleep(100000); } ?> ``` ```php <?php // send.php - 发送消息 require 'config.php'; $userId = getCurrentUserId(); if (!$userId) { http_response_code(401); exit(json_encode(['error' => 'Unauthorized'])); } $input = json_decode(file_get_contents('php://input'), true); $toUserId = $input['to'] ?? ''; $content = $input['content'] ?? ''; $messageType = $input['type'] ?? 'text'; if (!$toUserId || !$content) { http_response_code(400); exit(json_encode(['error' => 'Missing to or content'])); } $redis = getRedis(); $messageId = $redis->incr('global:message_id'); $message = [ 'id' => $messageId, 'from' => $userId, 'to' => $toUserId, 'content' => $content, 'type' => $messageType, 'timestamp' => time() ]; // 存储到接收者的消息队列 $redis->zAdd("user:{$toUserId}:messages", $messageId, json_encode($message)); // 可选:存储到发送者的已发送(用于多端同步) $redis->zAdd("user:{$userId}:sent", $messageId, json_encode($message)); // 设置过期时间(7 天) $redis->expire("user:{$toUserId}:messages", 7 * 24 * 3600); // 发布到 Pub/Sub(用于多服务器广播) $redis->publish("channel:{$toUserId}", json_encode($message)); // 存储到持久化数据库(异步) // queueForPersistence($message); header('Content-Type: application/json'); echo json_encode([ 'success' => true, 'message_id' => $messageId, 'timestamp' => $message['timestamp'] ]); ?> ``` ```php <?php // history.php - 获取历史消息 require 'config.php'; $userId = getCurrentUserId(); $peerId = $_GET['peer'] ?? ''; $beforeId = $_GET['before'] ?? '+inf'; $limit = min((int)($_GET['limit'] ?? 50), 100); $redis = getRedis(); // 获取与某用户的聊天记录 $sent = $redis->zRevRangeByScore( "user:{$userId}:sent", $beforeId, 0, ['limit' => [0, $limit]] ); $received = $redis->zRevRangeByScore( "user:{$userId}:messages", $beforeId, 0, ['limit' => [0, $limit]] ); // 合并并按时间排序 $messages = []; foreach (array_merge($sent, $received) as $msg) { $data = json_decode($msg, true); if ($data['from'] == $peerId || $data['to'] == $peerId) { $messages[] = $data; } } usort($messages, fn($a, $b) => $a['id'] - $b['id']); header('Content-Type: application/json'); echo json_encode([ 'messages' => array_slice($messages, 0, $limit), 'has_more' => count($messages) > $limit ]); ?> ``` ### 2.2 前端 (JavaScript) ```javascript class StreamableHTTPIM { constructor(baseUrl, token) { this.baseUrl = baseUrl; this.token = token; this.messageCallbacks = []; this.connected = false; this.reconnectDelay = 1000; this.lastMessageId = 0; } // 连接到消息流 async connect() { try { const response = await fetch(`${this.baseUrl}/im/stream?last_id=${this.lastMessageId}`, { headers: { 'Authorization': `Bearer ${this.token}` } }); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } this.connected = true; this.reconnectDelay = 1000; // 重置重连延迟 const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (this.connected) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop(); // 保留未完成行 for (const line of lines) { if (line.trim()) { this.handleMessage(line); } } } } catch (error) { console.error('连接错误:', error); } finally { this.connected = false; this.scheduleReconnect(); } } // 处理接收到的消息 handleMessage(line) { try { const msg = JSON.parse(line); switch (msg.type) { case 'connected': console.log('已连接,用户ID:', msg.user_id); break; case 'message': this.lastMessageId = Math.max(this.lastMessageId, msg.data.id); this.messageCallbacks.forEach(cb => cb(msg.data)); break; case 'ping': // 心跳,保持连接 break; } } catch (e) { console.error('解析消息失败:', e); } } // 发送消息 async send(to, content, type = 'text') { const response = await fetch(`${this.baseUrl}/im/send`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.token}` }, body: JSON.stringify({ to, content, type }) }); if (!response.ok) { throw new Error(`发送失败: ${response.status}`); } return await response.json(); } // 获取历史消息 async getHistory(peerId, options = {}) { const params = new URLSearchParams({ peer: peerId, ...options }); const response = await fetch(`${this.baseUrl}/im/history?${params}`, { headers: { 'Authorization': `Bearer ${this.token}` } }); return await response.json(); } // 监听消息 onMessage(callback) { this.messageCallbacks.push(callback); } // 断开连接 disconnect() { this.connected = false; } // 重连 scheduleReconnect() { console.log(`${this.reconnectDelay}ms 后重连...`); setTimeout(() => this.connect(), this.reconnectDelay); this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); } } // 使用示例 const im = new StreamableHTTPIM('https://api.example.com', 'your-jwt-token'); // 连接并监听消息 im.onMessage((msg) => { console.log(`收到来自 ${msg.from} 的消息:`, msg.content); // 更新 UI }); im.connect(); // 发送消息 async function sendMessage() { await im.send('user-123', '你好!这是 Streamable-HTTP IM'); } // 获取历史消息 async function loadHistory() { const history = await im.getHistory('user-123', { limit: 20 }); console.log('历史消息:', history.messages); } ``` --- ## 三、群聊实现 ```php <?php // send_group.php - 发送群消息 require 'config.php'; $userId = getCurrentUserId(); $input = json_decode(file_get_contents('php://input'), true); $groupId = $input['group'] ?? ''; $content = $input['content'] ?? ''; $redis = getRedis(); $messageId = $redis->incr('global:message_id'); $message = [ 'id' => $messageId, 'from' => $userId, 'group' => $groupId, 'content' => $content, 'timestamp' => time() ]; // 获取群成员 $members = $redis->sMembers("group:{$groupId}:members"); foreach ($members as $memberId) { if ($memberId != $userId) { $redis->zAdd("user:{$memberId}:messages", $messageId, json_encode($message)); $redis->publish("channel:{$memberId}", json_encode($message)); } } echo json_encode(['success' => true, 'message_id' => $messageId]); ?> ``` --- ## 四、多服务器扩展 ``` ┌─────────────────────────────────────────────────────────────┐ │ 多服务器架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 客户端 A 服务器 1 服务器 2 客户端 B│ │ │ │ │ │ │ │ │── 连接 ───────→│ │ │ │ │ │ │ │←── 连接 ────│ │ │ │ │ │ │ │ │ │ │◄── Redis Pub/Sub ─→│ │ │ │ │ │ (消息广播) │ │ │ │ │ │ │ │ │ │ │←── 推送 ───────│ │── 推送 ────→│ │ │ │ │ │ │ │ │ │ │ 负载均衡器 (Round Robin) │ │ 无需粘性会话! │ │ │ └─────────────────────────────────────────────────────────────┘ ``` 使用 Redis Pub/Sub 实现服务器间消息广播,**支持无状态水平扩展**。 --- ## 五、与 WebSocket IM 对比 | 特性 | Streamable-HTTP IM | WebSocket IM | |------|-------------------|--------------| | **部署难度** | 极低(标准 PHP 主机) | 中(需常驻服务) | | **兼容性** | 极佳(穿透任何防火墙) | 可能受阻 | | **消息延迟** | ~100-300ms | ~10-50ms | | **并发能力** | 高(HTTP/2 多路复用) | 极高(单连接) | | **服务器资源** | 中(定期重连) | 高(保持长连接) | | **断线恢复** | 需手动实现(示例已包含) | 需手动实现 | | **浏览器支持** | 所有现代浏览器 | IE10+ | | **移动端支持** | 极佳(兼容性好) | 需处理心跳 | | **历史消息** | 天然支持(HTTP 请求) | 需额外实现 | | **CDN 支持** | 完整 | 有限 | --- ## 六、优化建议 ### 6.1 减少延迟 ```php // 使用 Redis 发布订阅代替轮询 $redis->subscribe(["channel:{$userId}"], function($redis, $channel, $message) { echo $message . "\n"; flush(); }); ``` ### 6.2 处理大并发 ```nginx # Nginx 配置优化 location /im/stream { proxy_pass http://backend; proxy_buffering off; # 禁用缓冲 proxy_read_timeout 3600s; # 长连接超时 proxy_http_version 1.1; # HTTP/1.1 保持连接 } ``` ### 6.3 消息确认机制 ```javascript // 添加消息确认 async sendWithAck(to, content) { const tempId = generateId(); const msg = { tempId, to, content, status: 'sending' }; // 乐观更新 UI this.renderMessage(msg); try { const result = await this.send(to, content); // 更新为已发送 this.updateMessageStatus(tempId, 'sent', result.message_id); } catch (e) { this.updateMessageStatus(tempId, 'failed'); } } ``` --- ## 七、一句话总结 > **Streamable-HTTP 做 IM = 80% 的实时性 + 200% 的兼容性 + 零部署成本** 对于不需要游戏级延迟的 IM 场景(企业微信、钉钉、普通聊天),Streamable-HTTP 完全够用,且更容易部署和维护。 --- *实现时间:2026-03-07* *标签: #IM #StreamableHTTP #实时通信 #PHP #Redis #Web开发 #小凯*

讨论回复

0 条回复

还没有人回复,快来发表你的看法吧!