WebSocket:实现实时通信

学习如何在 Go 中使用 WebSocket 实现实时双向通信

WebSocket:实现实时通信

HTTP 是请求-响应模式,客户端必须主动发起请求才能获取数据。但在很多场景中(聊天室、实时游戏、股票行情、在线协作等),我们需要服务器主动推送数据给客户端。

WebSocket 就是为这种场景设计的协议。它允许客户端和服务器建立持久连接,实现真正的双向实时通信。

今天我们就来学习如何在 Go 中使用 WebSocket。

WebSocket 基础

HTTP vs WebSocket

HTTP:

  • 短连接,每次请求都要建立新连接(HTTP/1.1 有 keep-alive)
  • 单向:客户端请求 → 服务器响应
  • 开销大:每次请求都要发送完整的 HTTP 头

WebSocket:

  • 长连接,一次握手后保持连接
  • 双向:客户端和服务器都可以随时发送数据
  • 开销小:数据帧很轻量

WebSocket 握手

WebSocket 连接是通过 HTTP 升级建立的:

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

使用 gorilla/websocket

Go 标准库没有 WebSocket 支持,我们使用社区最流行的 gorilla/websocket

go get github.com/gorilla/websocket

简单的 Echo 服务器

package main

import (
	"log"
	"net/http"
	
	"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true // 允许所有来源(生产环境应该限制)
	},
}

func echoHandler(w http.ResponseWriter, r *http.Request) {
	// 升级 HTTP 连接为 WebSocket
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("升级失败:", err)
		return
	}
	defer conn.Close()
	
	log.Println("新连接:", conn.RemoteAddr())
	
	// 循环读取和发送消息
	for {
		messageType, message, err := conn.ReadMessage()
		if err != nil {
			log.Println("读取失败:", err)
			break
		}
		
		log.Printf("收到: %s", message)
		
		// Echo 回去
		err = conn.WriteMessage(messageType, message)
		if err != nil {
			log.Println("写入失败:", err)
			break
		}
	}
}

func main() {
	http.HandleFunc("/echo", echoHandler)
	
	log.Println("WebSocket 服务器启动在 :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

简单的客户端

package main

import (
	"bufio"
	"log"
	"os"
	
	"github.com/gorilla/websocket"
)

func main() {
	// 连接 WebSocket 服务器
	url := "ws://localhost:8080/echo"
	conn, _, err := websocket.DefaultDialer.Dial(url, nil)
	if err != nil {
		log.Fatal("连接失败:", err)
	}
	defer conn.Close()
	
	// 启动 goroutine 接收消息
	go func() {
		for {
			_, message, err := conn.ReadMessage()
			if err != nil {
				log.Println("读取失败:", err)
				return
			}
			log.Printf("收到: %s", message)
		}
	}()
	
	// 从标准输入读取消息并发送
	scanner := bufio.NewScanner(os.Stdin)
	log.Println("输入消息(按 Enter 发送):")
	for scanner.Scan() {
		message := scanner.Text()
		err := conn.WriteMessage(websocket.TextMessage, []byte(message))
		if err != nil {
			log.Println("发送失败:", err)
			break
		}
	}
}

实战:聊天室

让我们实现一个多用户的聊天室:

package main

import (
	"log"
	"net/http"
	"sync"
	
	"github.com/gorilla/websocket"
)

// Client 表示一个聊天室用户
type Client struct {
	conn     *websocket.Conn
	username string
	send     chan []byte
}

// ChatRoom 管理所有连接
type ChatRoom struct {
	clients    map[*Client]bool
	broadcast  chan []byte
	register   chan *Client
	unregister chan *Client
	mu         sync.RWMutex
}

func NewChatRoom() *ChatRoom {
	return &ChatRoom{
		clients:    make(map[*Client]bool),
		broadcast:  make(chan []byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
	}
}

func (room *ChatRoom) Run() {
	for {
		select {
		case client := <-room.register:
			room.mu.Lock()
			room.clients[client] = true
			room.mu.Unlock()
			log.Printf("用户 %s 加入聊天室", client.username)
			
			// 广播加入消息
			room.broadcast <- []byte("系统: " + client.username + " 加入了聊天室")
			
		case client := <-room.unregister:
			room.mu.Lock()
			if _, ok := room.clients[client]; ok {
				delete(room.clients, client)
				close(client.send)
				log.Printf("用户 %s 离开聊天室", client.username)
				room.broadcast <- []byte("系统: " + client.username + " 离开了聊天室")
			}
			room.mu.Unlock()
			
		case message := <-room.broadcast:
			room.mu.RLock()
			for client := range room.clients {
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(room.clients, client)
				}
			}
			room.mu.RUnlock()
		}
	}
}

func (room *ChatRoom) handleClient(conn *websocket.Conn, username string) {
	client := &Client{
		conn:     conn,
		username: username,
		send:     make(chan []byte, 256),
	}
	
	room.register <- client
	
	// 启动写 goroutine
	go func() {
		defer conn.Close()
		for message := range client.send {
			err := conn.WriteMessage(websocket.TextMessage, message)
			if err != nil {
				break
			}
		}
	}()
	
	// 读消息
	defer func() {
		room.unregister <- client
		conn.Close()
	}()
	
	for {
		_, message, err := conn.ReadMessage()
		if err != nil {
			break
		}
		
		// 格式化消息
		formatted := []byte(username + ": " + string(message))
		room.broadcast <- formatted
	}
}

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

var room = NewChatRoom()

func chatHandler(w http.ResponseWriter, r *http.Request) {
	username := r.URL.Query().Get("username")
	if username == "" {
		username = "匿名用户"
	}
	
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("升级失败:", err)
		return
	}
	
	room.handleClient(conn, username)
}

func main() {
	go room.Run()
	
	http.HandleFunc("/chat", chatHandler)
	
	log.Println("聊天室服务器启动在 :8080")
	log.Println("访问: ws://localhost:8080/chat?username=你的名字")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

实战:实时股票行情

package main

import (
	"encoding/json"
	"log"
	"math/rand"
	"net/http"
	"time"
	
	"github.com/gorilla/websocket"
)

type Stock struct {
	Symbol string  `json:"symbol"`
	Price  float64 `json:"price"`
	Change float64 `json:"change"`
	Time   string  `json:"time"`
}

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

func stockHandler(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("升级失败:", err)
		return
	}
	defer conn.Close()
	
	// 模拟股票数据
	stocks := []Stock{
		{Symbol: "AAPL", Price: 150.00},
		{Symbol: "GOOGL", Price: 2800.00},
		{Symbol: "MSFT", Price: 300.00},
		{Symbol: "TSLA", Price: 700.00},
	}
	
	// 定期推送行情
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			// 随机更新价格
			for i := range stocks {
				change := (rand.Float64() - 0.5) * 10
				stocks[i].Price += change
				stocks[i].Change = change
				stocks[i].Time = time.Now().Format("15:04:05")
			}
			
			// 发送数据
			data, _ := json.Marshal(stocks)
			err := conn.WriteMessage(websocket.TextMessage, data)
			if err != nil {
				log.Println("写入失败:", err)
				return
			}
		}
	}
}

func main() {
	http.HandleFunc("/stocks", stockHandler)
	
	// 提供 HTML 页面
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "text/html")
		w.Write([]byte(`
<!DOCTYPE html>
<html>
<head>
    <title>实时股票行情</title>
    <style>
        body { font-family: Arial, sans-serif; padding: 20px; }
        .stock { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
        .up { color: green; }
        .down { color: red; }
    </style>
</head>
<body>
    <h1>实时股票行情</h1>
    <div id="stocks"></div>
    
    <script>
        const ws = new WebSocket('ws://localhost:8080/stocks');
        
        ws.onmessage = function(event) {
            const stocks = JSON.parse(event.data);
            const container = document.getElementById('stocks');
            
            container.innerHTML = stocks.map(stock => `
                <div class="stock">
                    <strong>${stock.symbol}</strong>: 
                    $${stock.price.toFixed(2)} 
                    <span class="${stock.change >= 0 ? 'up' : 'down'}">
                        ${stock.change >= 0 ? '↑' : '↓'} ${Math.abs(stock.change).toFixed(2)}
                    </span>
                    <small>(${stock.time})</small>
                </div>
            `).join('');
        };
        
        ws.onclose = function() {
            document.getElementById('stocks').innerHTML = '连接已断开';
        };
    </script>
</body>
</html>
		`))
	})
	
	log.Println("股票行情服务器启动在 :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

心跳检测

长连接需要心跳检测来发现断开的连接:

const (
	writeWait      = 10 * time.Second
	pongWait       = 60 * time.Second
	pingPeriod     = (pongWait * 9) / 10
	maxMessageSize = 512
)

func (c *Client) readPump() {
	defer func() {
		c.room.unregister <- c
		c.conn.Close()
	}()
	
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error {
		c.conn.SetReadDeadline(time.Now().Add(pongWait))
		return nil
	})
	
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("错误: %v", err)
			}
			break
		}
		c.room.broadcast <- message
	}
}

func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()
	
	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)
			
			if err := w.Close(); err != nil {
				return
			}
			
		case <-ticker.C:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

小结

今天我们学习了 WebSocket:

  1. 基础概念:WebSocket vs HTTP,握手过程
  2. gorilla/websocket:服务器和客户端实现
  3. 实战应用:聊天室、实时股票行情
  4. 连接管理:心跳检测、连接池

WebSocket 为实时应用提供了强大的支持。无论是聊天、游戏还是协作工具,WebSocket 都是不可或缺的。

练习时间

  1. 实现一个在线画图板,多个用户可以同时画画
  2. 创建一个实时协作的 Markdown 编辑器
  3. 实现一个简单的在线游戏(如五子棋)
  4. 构建一个实时通知系统

我们下篇见!

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页