WebSocket 实时通信:构建交互式应用
传统的 HTTP 请求-响应模型无法满足实时应用的需求。想象一下聊天室、股票行情、在线游戏这些场景,客户端需要即时收到服务器的消息,而不是不断轮询。
WebSocket 提供了一种在单个 TCP 连接上进行全双工通信的协议,让服务器可以主动向客户端推送数据。
HTTP vs WebSocket
HTTP 的局限性
客户端 → 请求 → 服务器
客户端 ← 响应 ← 服务器
(每次通信都需要客户端主动发起)
WebSocket 的优势
客户端 ←→ 持久连接 ←→ 服务器
(双方都可以随时发送数据)
特点:
- 持久连接,无需重复建立
- 全双工通信,双向数据传输
- 低延迟,无 HTTP 头部开销
- 服务器可以主动推送
WebSocket 协议
WebSocket 连接通过 HTTP 升级建立:
客户端请求:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
建立连接后,双方可以通过 WebSocket 帧发送数据。
gorilla/websocket 库
Go 标准库不包含 WebSocket 支持,我们使用 gorilla/websocket:
go get github.com/gorilla/websocket
基础 Echo 服务器
package main
import (
"log"
"net/http"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源(生产环境应该限制)
},
}
func echoHandler(w http.ResponseWriter, r *http.Request) {
// 升级为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade error:", err)
return
}
defer conn.Close()
log.Println("Client connected")
// 循环读取和发送消息
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println("read error:", err)
break
}
log.Printf("Received: %s", message)
// Echo 回客户端
if err := conn.WriteMessage(messageType, message); err != nil {
log.Println("write error:", err)
break
}
}
}
func main() {
http.HandleFunc("/ws", echoHandler)
log.Println("Server starting on :8080")
http.ListenAndServe(":8080", nil)
}
客户端代码
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Demo</title>
</head>
<body>
<input type="text" id="message" placeholder="Enter message">
<button onclick="sendMessage()">Send</button>
<div id="output"></div>
<script>
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = function() {
console.log('Connected');
appendMessage('Connected to server');
};
ws.onmessage = function(event) {
appendMessage('Server: ' + event.data);
};
ws.onclose = function() {
appendMessage('Disconnected');
};
function sendMessage() {
const input = document.getElementById('message');
const message = input.value;
ws.send(message);
appendMessage('You: ' + message);
input.value = '';
}
function appendMessage(msg) {
const output = document.getElementById('output');
output.innerHTML += '<p>' + msg + '</p>';
}
</script>
</body>
</html>
消息类型
WebSocket 支持多种消息类型:
const (
TextMessage = 1 // 文本消息
BinaryMessage = 2 // 二进制消息
CloseMessage = 8 // 关闭连接
PingMessage = 9 // 心跳检测
PongMessage = 10 // 心跳响应
)
// 发送文本
conn.WriteMessage(websocket.TextMessage, []byte("Hello"))
// 发送 JSON
data := map[string]string{"message": "Hello"}
jsonData, _ := json.Marshal(data)
conn.WriteMessage(websocket.TextMessage, jsonData)
// 发送二进制
conn.WriteMessage(websocket.BinaryMessage, imageData)
心跳检测
保持连接活跃,检测断开的连接:
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
maxMessageSize = 512
)
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
实战:多用户聊天室
package main
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
// Client 表示一个聊天客户端
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
name string
}
// Hub 维护所有活跃的客户端
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
log.Printf("Client %s joined, total: %d", client.name, len(h.clients))
// 广播加入消息
msg := []byte(client.name + " joined the chat")
h.broadcast <- msg
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
log.Printf("Client %s left, total: %d", client.name, len(h.clients))
// 广播离开消息
msg := []byte(client.name + " left the chat")
h.broadcast <- msg
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.RUnlock()
}
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
name := r.URL.Query().Get("name")
if name == "" {
name = "Anonymous"
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
name: name,
}
client.hub.register <- client
go client.writePump()
go client.readPump()
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// 添加用户名前缀
fullMessage := []byte(c.name + ": " + string(message))
c.hub.broadcast <- fullMessage
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func main() {
hub := newHub()
go hub.run()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
// 提供 HTML 页面
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "chat.html")
})
log.Println("Chat server starting on :8080")
http.ListenAndServe(":8080", nil)
}
聊天室前端
<!DOCTYPE html>
<html>
<head>
<title>Chat Room</title>
<style>
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.system { color: #888; font-style: italic; }
</style>
</head>
<body>
<h1>Chat Room</h1>
<div id="messages"></div>
<input type="text" id="message" placeholder="Type your message...">
<button onclick="sendMessage()">Send</button>
<script>
const name = prompt("Enter your name:") || "Anonymous";
const ws = new WebSocket(`ws://localhost:8080/ws?name=${encodeURIComponent(name)}`);
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const div = document.createElement('div');
div.className = 'message';
const text = event.data;
if (text.includes('joined') || text.includes('left')) {
div.className += ' system';
}
div.textContent = text;
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
};
function sendMessage() {
const input = document.getElementById('message');
const message = input.value.trim();
if (message) {
ws.send(message);
input.value = '';
}
}
document.getElementById('message').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
房间功能
扩展支持多个聊天房间:
type Room struct {
name string
clients map[*Client]bool
hub *Hub
}
type Hub struct {
rooms map[string]*Room
register chan *Client
unregister chan *Client
broadcast chan *Message
}
type Message struct {
room string
message []byte
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
room := h.getOrCreateRoom(client.room)
room.clients[client] = true
case client := <-h.unregister:
if room, ok := h.rooms[client.room]; ok {
delete(room.clients, client)
close(client.send)
}
case msg := <-h.broadcast:
if room, ok := h.rooms[msg.room]; ok {
for client := range room.clients {
select {
case client.send <- msg.message:
default:
delete(room.clients, client)
close(client.send)
}
}
}
}
}
}
总结
WebSocket 为实时应用提供了强大的通信能力:
- 全双工通信:服务器和客户端都可以主动发送数据
- 低延迟:无需重复建立连接,无 HTTP 头部开销
- 持久连接:通过心跳保持连接活跃
- 消息类型:支持文本、二进制、控制帧等多种类型
常见应用场景:
- 聊天室和即时通讯
- 实时通知和推送
- 在线游戏
- 协作编辑
- 实时数据展示(股票、天气等)
记住:WebSocket 虽然强大,但不是所有场景都需要。简单的轮询或 Server-Sent Events(SSE)可能更适合某些场景。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。