实时通信架构:WebSocket、SSE与长轮询的设计与实践

深入对比WebSocket、SSE、长轮询三大实时通信技术的原理与适用场景,详解连接管理、心跳保活、断线重连、消息广播等核心技术,提供Go、Node.js完整实战代码。

引言

实时通信是现代应用的核心需求,从聊天系统到协作工具、股票行情到IoT设备监控,都需要服务端主动向客户端推送数据。本文将系统对比三大实时通信技术,并提供生产级实现方案。

技术对比

特性WebSocketSSE长轮询
通信方式双向服务端→客户端客户端请求
协议WS/WSSHTTPHTTP
连接持久持久周期性
浏览器支持99%+98%+100%
代理兼容中等
自动重连需手动浏览器内置不需要
适用场景聊天、游戏推送、通知简单推送

WebSocket深入实现

连接管理

package realtime

import (
    "context"
    "net/http"
    "sync"
    "time"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        // 生产环境应该校验Origin
        origin := r.Header.Get("Origin")
        return allowedOrigins[origin]
    },
}

type Client struct {
    ID       string
    UserID   string
    conn     *websocket.Conn
    send     chan []byte
    hub      *Hub
    ctx      context.Context
    cancel   context.CancelFunc
    lastSeen time.Time
    mu       sync.Mutex
}

type Hub struct {
    clients    map[string]*Client       // clientID -> client
    userClients map[string]map[string]bool // userID -> set of clientIDs
    rooms      map[string]map[string]bool // roomID -> set of clientIDs
    
    register   chan *Client
    unregister chan *Client
    broadcast  chan *BroadcastMessage
    
    mu sync.RWMutex
}

type BroadcastMessage struct {
    RoomID  string
    Data    []byte
    Exclude string  // 排除的clientID
}

func NewHub() *Hub {
    return &Hub{
        clients:     make(map[string]*Client),
        userClients: make(map[string]map[string]bool),
        rooms:       make(map[string]map[string]bool),
        register:    make(chan *Client),
        unregister:  make(chan *Client),
        broadcast:   make(chan *BroadcastMessage, 256),
    }
}

func (h *Hub) Run(ctx context.Context) {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client.ID] = client
            
            // 按用户索引
            if _, ok := h.userClients[client.UserID]; !ok {
                h.userClients[client.UserID] = make(map[string]bool)
            }
            h.userClients[client.UserID][client.ID] = true
            h.mu.Unlock()
            
            log.Infof("Client connected: %s (user: %s)", client.ID, client.UserID)
        
        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client.ID]; ok {
                delete(h.clients, client.ID)
                delete(h.userClients[client.UserID], client.ID)
                close(client.send)
            }
            h.mu.Unlock()
            
            log.Infof("Client disconnected: %s", client.ID)
        
        case msg := <-h.broadcast:
            h.mu.RLock()
            if clients, ok := h.rooms[msg.RoomID]; ok {
                for clientID := range clients {
                    if clientID == msg.Exclude {
                        continue
                    }
                    if client, ok := h.clients[clientID]; ok {
                        select {
                        case client.send <- msg.Data:
                        default:
                            // 发送队列满,断开连接
                            close(client.send)
                            delete(h.clients, clientID)
                        }
                    }
                }
            }
            h.mu.RUnlock()
        
        case <-ctx.Done():
            return
        }
    }
}

心跳保活

const (
    writeWait      = 10 * time.Second
    pongWait       = 60 * time.Second
    pingPeriod     = (pongWait * 9) / 10  // 54秒
    maxMessageSize = 512
)

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    
    c.conn.SetReadLimit(maxMessageSize)
    c.conn.SetReadDeadline(time.Now().Add(pongWait))
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        c.mu.Lock()
        c.lastSeen = time.Now()
        c.mu.Unlock()
        return nil
    })
    
    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Warnf("WebSocket error: %v", err)
            }
            return
        }
        
        // 处理消息
        c.handleMessage(message)
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()
    
    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            
            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)
            
            if err := w.Close(); err != nil {
                return
            }
        
        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        
        case <-c.ctx.Done():
            return
        }
    }
}

房间订阅

func (c *Client) handleMessage(message []byte) {
    var msg struct {
        Type    string          `json:"type"`
        RoomID  string          `json:"room_id,omitempty"`
        Payload json.RawMessage `json:"payload,omitempty"`
    }
    
    if err := json.Unmarshal(message, &msg); err != nil {
        c.sendError("invalid message format")
        return
    }
    
    switch msg.Type {
    case "join_room":
        c.hub.mu.Lock()
        if _, ok := c.hub.rooms[msg.RoomID]; !ok {
            c.hub.rooms[msg.RoomID] = make(map[string]bool)
        }
        c.hub.rooms[msg.RoomID][c.ID] = true
        c.hub.mu.Unlock()
        
        c.send <- []byte(`{"type":"room_joined","room_id":"` + msg.RoomID + `"}`)
    
    case "leave_room":
        c.hub.mu.Lock()
        if clients, ok := c.hub.rooms[msg.RoomID]; ok {
            delete(clients, c.ID)
        }
        c.hub.mu.Unlock()
    
    case "message":
        c.hub.broadcast <- &BroadcastMessage{
            RoomID:  msg.RoomID,
            Data:    message,
            Exclude: c.ID,  // 不回传给自己
        }
    }
}

SSE(Server-Sent Events)

Go实现

type SSEBroker struct {
    clients map[string]chan []byte
    mu      sync.RWMutex
}

func NewSSEBroker() *SSEBroker {
    return &SSEBroker{
        clients: make(map[string]chan []byte),
    }
}

func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }
    
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    
    clientID := uuid.New().String()
    messageChan := make(chan []byte, 10)
    
    b.mu.Lock()
    b.clients[clientID] = messageChan
    b.mu.Unlock()
    
    defer func() {
        b.mu.Lock()
        delete(b.clients, clientID)
        close(messageChan)
        b.mu.Unlock()
    }()
    
    // 监听客户端断开
    ctx := r.Context()
    
    for {
        select {
        case msg := <-messageChan:
            fmt.Fprintf(w, "data: %s\n\n", msg)
            flusher.Flush()
        
        case <-ctx.Done():
            return
        }
    }
}

func (b *SSEBroker) Broadcast(event string, data interface{}) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    
    json, _ := json.Marshal(map[string]interface{}{
        "event": event,
        "data":  data,
        "time":  time.Now(),
    })
    
    for _, ch := range b.clients {
        select {
        case ch <- json:
        default:
            // 跳过慢客户端
        }
    }
}

客户端实现(自动重连)

class SSEClient {
    constructor(url, options = {}) {
        this.url = url;
        this.retryCount = 0;
        this.maxRetries = options.maxRetries || 5;
        this.retryDelay = options.retryDelay || 1000;
        this.handlers = new Map();
        
        this.connect();
    }
    
    connect() {
        this.eventSource = new EventSource(this.url);
        
        this.eventSource.onopen = () => {
            console.log('SSE connected');
            this.retryCount = 0;
        };
        
        this.eventSource.onerror = (error) => {
            console.error('SSE error:', error);
            
            if (this.retryCount < this.maxRetries) {
                this.retryCount++;
                const delay = this.retryDelay * Math.pow(2, this.retryCount);
                console.log(`Retrying in ${delay}ms (attempt ${this.retryCount})`);
                setTimeout(() => this.connect(), delay);
            } else {
                console.error('Max retries reached');
                this.eventSource.close();
            }
        };
        
        // 绑定事件处理器
        for (const [event, handler] of this.handlers) {
            this.eventSource.addEventListener(event, handler);
        }
    }
    
    on(event, handler) {
        this.handlers.set(event, handler);
        if (this.eventSource) {
            this.eventSource.addEventListener(event, handler);
        }
    }
    
    close() {
        if (this.eventSource) {
            this.eventSource.close();
        }
    }
}

// 使用示例
const client = new SSEClient('/api/events');
client.on('order.update', (event) => {
    const data = JSON.parse(event.data);
    console.log('Order updated:', data);
});

长轮询实现

type LongPollingBroker struct {
    subscribers map[string]chan interface{}
    mu          sync.RWMutex
}

func (b *LongPollingBroker) Subscribe(clientID string) <-chan interface{} {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    ch := make(chan interface{}, 10)
    b.subscribers[clientID] = ch
    return ch
}

func (b *LongPollingBroker) Unsubscribe(clientID string) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    if ch, ok := b.subscribers[clientID]; ok {
        close(ch)
        delete(b.subscribers, clientID)
    }
}

func (b *LongPollingBroker) Poll(ctx context.Context, clientID string, timeout time.Duration) (interface{}, error) {
    b.mu.RLock()
    ch, ok := b.subscribers[clientID]
    b.mu.RUnlock()
    
    if !ok {
        return nil, errors.New("not subscribed")
    }
    
    timer := time.NewTimer(timeout)
    defer timer.Stop()
    
    select {
    case msg := <-ch:
        return msg, nil
    case <-timer.C:
        return nil, nil  // 超时,返回空响应
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

// HTTP处理器
func (h *LongPollingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    clientID := r.URL.Query().Get("client_id")
    timeout := 30 * time.Second
    
    msg, err := h.broker.Poll(r.Context(), clientID, timeout)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    if msg == nil {
        w.WriteHeader(http.StatusNoContent)
        return
    }
    
    json.NewEncoder(w).Encode(msg)
}

扩展:分布式消息分发

type DistributedHub struct {
    localHub   *Hub
    pubsub     *redis.Client
    nodeID     string
}

func NewDistributedHub(hub *Hub, redis *redis.Client) *DistributedHub {
    return &DistributedHub{
        localHub: hub,
        pubsub:   redis,
        nodeID:   uuid.New().String(),
    }
}

func (h *DistributedHub) Start(ctx context.Context) {
    // 订阅Redis频道,接收其他节点的消息
    sub := h.pubsub.Subscribe(ctx, "ws:broadcast")
    ch := sub.Channel()
    
    go func() {
        for msg := range ch {
            var payload struct {
                SourceNode string
                RoomID     string
                Data       []byte
            }
            json.Unmarshal([]byte(msg.Payload), &payload)
            
            // 避免回环
            if payload.SourceNode == h.nodeID {
                continue
            }
            
            // 分发给本地客户端
            h.localHub.broadcast <- &BroadcastMessage{
                RoomID: payload.RoomID,
                Data:   payload.Data,
            }
        }
    }()
}

func (h *DistributedHub) Broadcast(roomID string, data []byte) {
    // 分发给本地客户端
    h.localHub.broadcast <- &BroadcastMessage{
        RoomID: roomID,
        Data:   data,
    }
    
    // 发布到其他节点
    payload, _ := json.Marshal(map[string]interface{}{
        "source_node": h.nodeID,
        "room_id":     roomID,
        "data":        data,
    })
    
    h.pubsub.Publish(context.Background(), "ws:broadcast", payload)
}

总结

实时通信技术选择:

  • WebSocket:双向通信、聊天、多人协作、实时游戏
  • SSE:单向推送、股票行情、通知、日志流
  • 长轮询:兼容老旧浏览器、简单推送场景

生产环境关键考虑:

  • 心跳保活防止连接泄漏
  • 断线自动重连机制
  • 消息确认和重传
  • 分布式环境下的消息路由(Redis Pub/Sub)
  • 连接数监控和限流

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页