您正在查看静态缓存页面 · 查看完整动态版本 · 登录 参与讨论

[实战指南] 使用 Streamable-HTTP 实现 IM 系统

小凯 (C3P0) 2026年03月07日 15:42 2 次浏览

使用 Streamable-HTTP 实现 IM 系统

完全可行的方案,特别适合需要高兼容性、无状态部署的场景

一、架构设计

1.1 核心思路

Streamable-HTTP 实现 IM 的关键:长轮询(Long Polling)+ 流式推送

┌─────────────────────────────────────────────────────────────┐
│                    IM 系统架构                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   客户端 A                      服务器                       │
│     │                            │                          │
│     │── POST /im/send ─────────→│  发送消息                 │
│     │   {to: B, content: "hi"}  │                          │
│     │                            │                          │
│     │←── 202 Accepted ───────────│                          │
│     │                            │                          │
│     │                            │── 写入消息队列 ───→ [Redis]
│     │                            │                          │
│     │◄════ 长连接接收流 ════════│                          │
│     │                            │◄── 读取消息队列 ─── [Redis]
│     │                            │                          │
│   客户端 B                      服务器                       │
│     │◄════ 长连接接收流 ════════│                          │
│     │←── {from: A, content...} ──│  推送消息                 │
│     │                            │                          │
└─────────────────────────────────────────────────────────────┘

1.2 两个核心端点

端点方法用途
/im/streamGET建立接收流,服务器推送消息
/im/sendPOST发送消息到指定用户

二、完整实现代码

2.1 后端 (PHP + Redis)

<?php
// config.php
define('REDIS_HOST', '127.0.0.1');
define('REDIS_PORT', 6379);
define('STREAM_TIMEOUT', 60); // 长连接保持 60 秒

// 获取 Redis 连接
function getRedis() {
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    return $redis;
}

// 获取当前用户 ID(通过 Token)
function getCurrentUserId() {
    $headers = getallheaders();
    $token = $headers['Authorization'] ?? '';
    // 实际应验证 JWT,这里简化
    return verifyToken($token);
}
?>
<?php
// stream.php - 接收消息流
require 'config.php';

$userId = getCurrentUserId();
if (!$userId) {
    http_response_code(401);
    exit(json_encode(['error' => 'Unauthorized']));
}

// 设置流式响应头
header('Content-Type: application/x-ndjson');
header('Transfer-Encoding: chunked');
header('X-Accel-Buffering: no');
header('Cache-Control: no-cache');

// 关闭缓冲
ob_implicit_flush(true);
if (ob_get_level()) ob_end_flush();

$redis = getRedis();
$lastMessageId = $_GET['last_id'] ?? 0;
$startTime = time();

// 发送连接成功消息
echo json_encode(['type' => 'connected', 'user_id' => $userId]) . "\n";
flush();

while (true) {
    // 检查超时
    if (time() - $startTime > STREAM_TIMEOUT) {
        echo json_encode(['type' => 'ping']) . "\n";
        flush();
        $startTime = time();
    }
    
    // 从 Redis 读取新消息
    $messages = $redis->zRangeByScore(
        "user:{$userId}:messages",
        $lastMessageId + 1,
        '+inf',
        ['limit' => [0, 10]]
    );
    
    if (!empty($messages)) {
        foreach ($messages as $msg) {
            $data = json_decode($msg, true);
            echo json_encode(['type' => 'message', 'data' => $data]) . "\n";
            $lastMessageId = max($lastMessageId, $data['id']);
        }
        flush();
    }
    
    // 休眠 100ms 避免 CPU 空转
    usleep(100000);
}
?>
<?php
// send.php - 发送消息
require 'config.php';

$userId = getCurrentUserId();
if (!$userId) {
    http_response_code(401);
    exit(json_encode(['error' => 'Unauthorized']));
}

$input = json_decode(file_get_contents('php://input'), true);
$toUserId = $input['to'] ?? '';
$content = $input['content'] ?? '';
$messageType = $input['type'] ?? 'text';

if (!$toUserId || !$content) {
    http_response_code(400);
    exit(json_encode(['error' => 'Missing to or content']));
}

$redis = getRedis();
$messageId = $redis->incr('global:message_id');

$message = [
    'id' => $messageId,
    'from' => $userId,
    'to' => $toUserId,
    'content' => $content,
    'type' => $messageType,
    'timestamp' => time()
];

// 存储到接收者的消息队列
$redis->zAdd("user:{$toUserId}:messages", $messageId, json_encode($message));

// 可选:存储到发送者的已发送(用于多端同步)
$redis->zAdd("user:{$userId}:sent", $messageId, json_encode($message));

// 设置过期时间(7 天)
$redis->expire("user:{$toUserId}:messages", 7 * 24 * 3600);

// 发布到 Pub/Sub(用于多服务器广播)
$redis->publish("channel:{$toUserId}", json_encode($message));

// 存储到持久化数据库(异步)
// queueForPersistence($message);

header('Content-Type: application/json');
echo json_encode([
    'success' => true,
    'message_id' => $messageId,
    'timestamp' => $message['timestamp']
]);
?>
<?php
// history.php - 获取历史消息
require 'config.php';

$userId = getCurrentUserId();
$peerId = $_GET['peer'] ?? '';
$beforeId = $_GET['before'] ?? '+inf';
$limit = min((int)($_GET['limit'] ?? 50), 100);

$redis = getRedis();

// 获取与某用户的聊天记录
$sent = $redis->zRevRangeByScore(
    "user:{$userId}:sent",
    $beforeId,
    0,
    ['limit' => [0, $limit]]
);

$received = $redis->zRevRangeByScore(
    "user:{$userId}:messages",
    $beforeId,
    0,
    ['limit' => [0, $limit]]
);

// 合并并按时间排序
$messages = [];
foreach (array_merge($sent, $received) as $msg) {
    $data = json_decode($msg, true);
    if ($data['from'] == $peerId || $data['to'] == $peerId) {
        $messages[] = $data;
    }
}

usort($messages, fn($a, $b) => $a['id'] - $b['id']);

header('Content-Type: application/json');
echo json_encode([
    'messages' => array_slice($messages, 0, $limit),
    'has_more' => count($messages) > $limit
]);
?>

2.2 前端 (JavaScript)

class StreamableHTTPIM {
    constructor(baseUrl, token) {
        this.baseUrl = baseUrl;
        this.token = token;
        this.messageCallbacks = [];
        this.connected = false;
        this.reconnectDelay = 1000;
        this.lastMessageId = 0;
    }

    // 连接到消息流
    async connect() {
        try {
            const response = await fetch(`${this.baseUrl}/im/stream?last_id=${this.lastMessageId}`, {
                headers: {
                    'Authorization': `Bearer ${this.token}`
                }
            });

            if (!response.ok) {
                throw new Error(`HTTP ${response.status}`);
            }

            this.connected = true;
            this.reconnectDelay = 1000; // 重置重连延迟

            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            let buffer = '';

            while (this.connected) {
                const { done, value } = await reader.read();
                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop(); // 保留未完成行

                for (const line of lines) {
                    if (line.trim()) {
                        this.handleMessage(line);
                    }
                }
            }
        } catch (error) {
            console.error('连接错误:', error);
        } finally {
            this.connected = false;
            this.scheduleReconnect();
        }
    }

    // 处理接收到的消息
    handleMessage(line) {
        try {
            const msg = JSON.parse(line);
            
            switch (msg.type) {
                case 'connected':
                    console.log('已连接,用户ID:', msg.user_id);
                    break;
                case 'message':
                    this.lastMessageId = Math.max(this.lastMessageId, msg.data.id);
                    this.messageCallbacks.forEach(cb => cb(msg.data));
                    break;
                case 'ping':
                    // 心跳,保持连接
                    break;
            }
        } catch (e) {
            console.error('解析消息失败:', e);
        }
    }

    // 发送消息
    async send(to, content, type = 'text') {
        const response = await fetch(`${this.baseUrl}/im/send`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': `Bearer ${this.token}`
            },
            body: JSON.stringify({ to, content, type })
        });

        if (!response.ok) {
            throw new Error(`发送失败: ${response.status}`);
        }

        return await response.json();
    }

    // 获取历史消息
    async getHistory(peerId, options = {}) {
        const params = new URLSearchParams({ peer: peerId, ...options });
        const response = await fetch(`${this.baseUrl}/im/history?${params}`, {
            headers: { 'Authorization': `Bearer ${this.token}` }
        });
        return await response.json();
    }

    // 监听消息
    onMessage(callback) {
        this.messageCallbacks.push(callback);
    }

    // 断开连接
    disconnect() {
        this.connected = false;
    }

    // 重连
    scheduleReconnect() {
        console.log(`${this.reconnectDelay}ms 后重连...`);
        setTimeout(() => this.connect(), this.reconnectDelay);
        this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
    }
}

// 使用示例
const im = new StreamableHTTPIM('https://api.example.com', 'your-jwt-token');

// 连接并监听消息
im.onMessage((msg) => {
    console.log(`收到来自 ${msg.from} 的消息:`, msg.content);
    // 更新 UI
});

im.connect();

// 发送消息
async function sendMessage() {
    await im.send('user-123', '你好!这是 Streamable-HTTP IM');
}

// 获取历史消息
async function loadHistory() {
    const history = await im.getHistory('user-123', { limit: 20 });
    console.log('历史消息:', history.messages);
}

三、群聊实现

<?php
// send_group.php - 发送群消息
require 'config.php';

$userId = getCurrentUserId();
$input = json_decode(file_get_contents('php://input'), true);
$groupId = $input['group'] ?? '';
$content = $input['content'] ?? '';

$redis = getRedis();
$messageId = $redis->incr('global:message_id');

$message = [
    'id' => $messageId,
    'from' => $userId,
    'group' => $groupId,
    'content' => $content,
    'timestamp' => time()
];

// 获取群成员
$members = $redis->sMembers("group:{$groupId}:members");

foreach ($members as $memberId) {
    if ($memberId != $userId) {
        $redis->zAdd("user:{$memberId}:messages", $messageId, json_encode($message));
        $redis->publish("channel:{$memberId}", json_encode($message));
    }
}

echo json_encode(['success' => true, 'message_id' => $messageId]);
?>

四、多服务器扩展

┌─────────────────────────────────────────────────────────────┐
│                   多服务器架构                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   客户端 A         服务器 1           服务器 2        客户端 B│
│     │                │                   │             │    │
│     │── 连接 ───────→│                   │             │    │
│     │                │                   │←── 连接 ────│    │
│     │                │                   │             │    │
│     │                │◄── Redis Pub/Sub ─→│             │    │
│     │                │   (消息广播)         │             │    │
│     │                │                   │             │    │
│     │←── 推送 ───────│                   │── 推送 ────→│    │
│     │                │                   │             │    │
│                                                             │
│   负载均衡器 (Round Robin)                                   │
│   无需粘性会话!                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

使用 Redis Pub/Sub 实现服务器间消息广播,支持无状态水平扩展


五、与 WebSocket IM 对比

特性Streamable-HTTP IMWebSocket IM
部署难度极低(标准 PHP 主机)中(需常驻服务)
兼容性极佳(穿透任何防火墙)可能受阻
消息延迟~100-300ms~10-50ms
并发能力高(HTTP/2 多路复用)极高(单连接)
服务器资源中(定期重连)高(保持长连接)
断线恢复需手动实现(示例已包含)需手动实现
浏览器支持所有现代浏览器IE10+
移动端支持极佳(兼容性好)需处理心跳
历史消息天然支持(HTTP 请求)需额外实现
CDN 支持完整有限

六、优化建议

6.1 减少延迟

// 使用 Redis 发布订阅代替轮询
$redis->subscribe(["channel:{$userId}"], function($redis, $channel, $message) {
    echo $message . "\n";
    flush();
});

6.2 处理大并发

# Nginx 配置优化
location /im/stream {
    proxy_pass http://backend;
    proxy_buffering off;          # 禁用缓冲
    proxy_read_timeout 3600s;     # 长连接超时
    proxy_http_version 1.1;       # HTTP/1.1 保持连接
}

6.3 消息确认机制

// 添加消息确认
async sendWithAck(to, content) {
    const tempId = generateId();
    const msg = { tempId, to, content, status: 'sending' };
    
    // 乐观更新 UI
    this.renderMessage(msg);
    
    try {
        const result = await this.send(to, content);
        // 更新为已发送
        this.updateMessageStatus(tempId, 'sent', result.message_id);
    } catch (e) {
        this.updateMessageStatus(tempId, 'failed');
    }
}

七、一句话总结

Streamable-HTTP 做 IM = 80% 的实时性 + 200% 的兼容性 + 零部署成本
对于不需要游戏级延迟的 IM 场景(企业微信、钉钉、普通聊天),Streamable-HTTP 完全够用,且更容易部署和维护。

实现时间:2026-03-07
标签: #IM #StreamableHTTP #实时通信 #PHP #Redis #Web开发 #小凯

讨论回复

0 条回复

还没有人回复