Go 与消息队列:Kafka 和 RabbitMQ 实战
凌晨三点,你的电商系统正在做促销活动。突然,订单量暴增 10 倍。数据库扛不住了,CPU 飙到 100%,订单服务开始超时,支付回调丢失,库存扣减不一致……整个系统像多米诺骨牌一样倒下了。
问题出在哪里?答案是:你的服务之间耦合太紧了。订单服务同步调用支付、库存、通知,任何一个环节变慢都会拖垮整条链路。
这就是消息队列大显身手的地方。它就像邮局——你不需要亲自跑到朋友家门口送信,只要把信投进邮筒,邮局会负责送达。发送方和接收方完全解耦,即使接收方暂时不在,消息也不会丢失。
今天,我们就来深入学习 Go 语言中两个最流行的消息队列:Kafka 和 RabbitMQ,从基础概念到生产级实战。
消息队列的核心概念
在写代码之前,我们先建立对消息队列的整体认知。
为什么需要消息队列?
消息队列解决三大核心问题:
1. 异步处理
没有消息队列时,用户下单的流程是这样的:
用户请求 → 创建订单(50ms) → 扣减库存(80ms) → 发起支付(120ms) → 发送通知(60ms) → 返回响应
总耗时:310ms
引入消息队列后:
用户请求 → 创建订单(50ms) → 发送消息到队列(5ms) → 返回响应
总耗时:55ms
后台异步消费:
Worker A: 扣减库存
Worker B: 发起支付
Worker C: 发送通知
响应时间从 310ms 降到 55ms,用户体验直接起飞。
2. 流量削峰
促销活动时每秒涌入 10000 个订单,但数据库每秒只能处理 2000 次写入。没有消息队列,系统直接崩溃。有了消息队列,请求先进队列,后台按数据库能承受的速率慢慢消费。
3. 系统解耦
订单服务只管往队列里扔消息,不关心谁在消费、消费多快、是否在线。库存服务、支付服务、通知服务各自独立运行,独立扩缩容。
消息模型
消息队列有两种基本模型:
点对点模型(Queue):一条消息只被一个消费者消费。
Producer ──> [Message Queue] ──> Consumer A
Consumer B (竞争消费,只有一个能拿到)
发布/订阅模型(Pub/Sub):一条消息被多个订阅者消费。
Publisher ──> [Topic] ──> Subscriber A (收到完整副本)
Subscriber B (收到完整副本)
Subscriber C (收到完整副本)
Kafka 和 RabbitMQ 都支持这两种模型,但设计哲学不同:
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 消息存储 | 持久化日志,可重放 | 消费后删除 |
| 消费模型 | 拉(Pull) | 推(Push) |
| 吞吐量 | 极高(百万/秒) | 高(万级/秒) |
| 延迟 | 毫秒级 | 微秒级 |
| 消息顺序 | 分区内有序 | 单队列有序 |
| 典型场景 | 日志、事件流、大数据 | 任务队列、工作流 |
Kafka:分布式事件流平台
Kafka 最初由 LinkedIn 开发,如今已成为大数据和事件驱动架构的基石。它不是简单的消息队列,而是一个分布式事件流平台。
Kafka 核心概念
┌─────────────────────────────┐
│ Kafka Cluster │
│ │
Producer ──写入──> │ Topic: orders │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Partition│ │Partition│ │Partition│ │
│ │ 0 │ │ 1 │ │ 2 │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
Consumer Group ──读取──> │
┌─────────┐ │ │
│Consumer A│──读──> │ Partition 0 │
│Consumer B│──读──> │ Partition 1 │
│Consumer C│──读──> │ Partition 2 │
└─────────┘ │ │
└─────────────────────────────┘
- Topic:消息的逻辑分类(类似数据库表)
- Partition:Topic 的物理分片,是并行度的基本单位
- Offset:每条消息在 Partition 中的唯一序号
- Consumer Group:一组消费者共同消费一个 Topic
安装和启动 Kafka
# 使用 Docker 快速启动(推荐)
docker-compose up -d
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
depends_on:
- zookeeper
Kafka 生产者:发送消息
Go 生态中最优秀的 Kafka 客户端是 segmentio/kafka-go,它的设计简洁、API 友好。
go get github.com/segmentio/kafka-go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// OrderEvent 订单事件
type OrderEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
ProductID string `json:"product_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// KafkaProducer 封装 Kafka 生产者
type KafkaProducer struct {
writer *kafka.Writer
}
func NewKafkaProducer(brokers []string, topic string) *KafkaProducer {
return &KafkaProducer{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{}, // 按字节数负载均衡到分区
// 生产环境推荐配置
BatchSize: 100, // 批量发送
BatchTimeout: 50 * time.Millisecond,
MaxAttempts: 3, // 最大重试次数
RequiredAcks: kafka.RequireAll, // 所有副本确认
Compression: kafka.Snappy, // 消息压缩
},
}
}
func (p *KafkaProducer) Send(ctx context.Context, key string, event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
msg := kafka.Message{
Key: []byte(key), // Key 相同的消息会进入同一个分区,保证顺序
Value: data,
Time: time.Now(),
Headers: []kafka.Header{
{Key: "event-type", Value: []byte("order.created")},
{Key: "version", Value: []byte("1.0")},
},
}
if err := p.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("failed to write message: %w", err)
}
log.Printf("📨 Sent order event: %s (key: %s)", event.OrderID, key)
return nil
}
func (p *KafkaProducer) Close() error {
return p.writer.Close()
}
func main() {
producer := NewKafkaProducer(
[]string{"localhost:9092"},
"orders",
)
defer producer.Close()
ctx := context.Background()
// 模拟发送订单事件
orders := []OrderEvent{
{OrderID: "ORD-001", UserID: "user-1", ProductID: "prod-A", Amount: 99.9, Status: "created", CreatedAt: time.Now()},
{OrderID: "ORD-002", UserID: "user-2", ProductID: "prod-B", Amount: 199.0, Status: "created", CreatedAt: time.Now()},
{OrderID: "ORD-003", UserID: "user-1", ProductID: "prod-C", Amount: 59.9, Status: "created", CreatedAt: time.Now()},
}
for _, order := range orders {
if err := producer.Send(ctx, order.UserID, order); err != nil {
log.Printf("Failed to send: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
log.Println("All orders sent!")
}
Kafka 消费者:处理消息
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
type OrderEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
ProductID string `json:"product_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// KafkaConsumer 封装 Kafka 消费者
type KafkaConsumer struct {
reader *kafka.Reader
}
func NewKafkaConsumer(brokers []string, topic, groupID string) *KafkaConsumer {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
MinBytes: 10e3, // 最小批量大小 10KB
MaxBytes: 10e6, // 最大批量大小 10MB
CommitInterval: time.Second, // 每秒提交一次偏移量
StartOffset: kafka.LastOffset, // 从最新消息开始消费
// 生产环境:从最早消息开始,避免丢失
// StartOffset: kafka.FirstOffset,
})
return &KafkaConsumer{reader: r}
}
func (c *KafkaConsumer) Consume(ctx context.Context, handler func(OrderEvent) error) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("Error fetching message: %v", err)
continue
}
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
// 提交偏移量,跳过坏消息(生产环境应发送到死信队列)
c.reader.CommitMessages(ctx, msg)
continue
}
// 处理消息
log.Printf("📬 Processing order: %s (partition: %d, offset: %d)",
event.OrderID, msg.Partition, msg.Offset)
if err := handler(event); err != nil {
log.Printf("Failed to process order %s: %v", event.OrderID, err)
// 重试逻辑或发送到死信队列
// 这里先不提交偏移量,下次会重新消费
continue
}
// 处理成功,提交偏移量
if err := c.reader.CommitMessages(ctx, msg); err != nil {
log.Printf("Failed to commit offset: %v", err)
}
}
}
func (c *KafkaConsumer) Close() error {
return c.reader.Close()
}
func main() {
consumer := NewKafkaConsumer(
[]string{"localhost:9092"},
"orders",
"inventory-service", // 消费者组 ID
)
defer consumer.Close()
ctx := context.Background()
log.Println("Inventory service started, waiting for orders...")
err := consumer.Consume(ctx, func(event OrderEvent) error {
log.Printf("📦 Deducting stock for product %s (order: %s)",
event.ProductID, event.OrderID)
// 模拟库存扣减
time.Sleep(50 * time.Millisecond)
log.Printf("✅ Stock deducted for order %s", event.OrderID)
return nil
})
if err != nil {
log.Fatalf("Consumer error: %v", err)
}
}
多消费者组并行消费
Kafka 的一个强大特性是多个消费者组可以独立消费同一个 Topic:
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 优雅关闭
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup
// 启动库存扣减消费者
wg.Add(1)
go func() {
defer wg.Done()
consumer := NewKafkaConsumer(
[]string{"localhost:9092"},
"orders",
"inventory-group", // 独立的消费者组
)
defer consumer.Close()
consumer.Consume(ctx, func(event OrderEvent) error {
log.Printf("[Inventory] Processing order %s", event.OrderID)
return deductStock(event)
})
}()
// 启动支付消费者
wg.Add(1)
go func() {
defer wg.Done()
consumer := NewKafkaConsumer(
[]string{"localhost:9092"},
"orders",
"payment-group", // 不同的消费者组
)
defer consumer.Close()
consumer.Consume(ctx, func(event OrderEvent) error {
log.Printf("[Payment] Processing order %s", event.OrderID)
return processPayment(event)
})
}()
// 启动通知消费者
wg.Add(1)
go func() {
defer wg.Done()
consumer := NewKafkaConsumer(
[]string{"localhost:9092"},
"orders",
"notification-group",
)
defer consumer.Close()
consumer.Consume(ctx, func(event OrderEvent) error {
log.Printf("[Notification] Sending confirmation for order %s", event.OrderID)
return sendNotification(event)
})
}()
// 等待关闭信号
<-sigChan
log.Println("Shutting down consumers...")
cancel()
wg.Wait()
log.Println("All consumers stopped")
}
Kafka 事件溯源模式
Kafka 的消息是持久化的,支持重新消费。这使得事件溯源(Event Sourcing)成为可能——你的系统状态可以通过重放所有事件来重建。
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// AccountEvent 账户事件
type AccountEvent struct {
EventID string `json:"event_id"`
AccountID string `json:"account_id"`
Type string `json:"type"` // deposit, withdrawal, transfer
Amount float64 `json:"amount"`
Timestamp time.Time `json:"timestamp"`
}
// AccountState 账户状态(通过事件重建)
type AccountState struct {
AccountID string `json:"account_id"`
Balance float64 `json:"balance"`
Version int `json:"version"` // 事件版本号
}
// EventStore 基于 Kafka 的事件存储
type EventStore struct {
writer *kafka.Writer
}
func NewEventStore(brokers []string) *EventStore {
return &EventStore{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "account-events",
Balancer: &kafka.Hash{}, // 按 key 哈希分区
RequiredAcks: kafka.RequireAll,
},
}
}
func (es *EventStore) Append(ctx context.Context, event AccountEvent) error {
data, _ := json.Marshal(event)
return es.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(event.AccountID),
Value: data,
})
}
// RebuildState 从事件流重建账户状态
func (es *EventStore) RebuildState(ctx context.Context, brokers []string, accountID string) (*AccountState, error) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "account-events",
Partition: 0, // 简化示例,实际应根据分区策略确定
StartOffset: kafka.FirstOffset, // 从第一条消息开始
})
defer reader.Close()
state := &AccountState{
AccountID: accountID,
Balance: 0,
}
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
break // 读完所有消息
}
var event AccountEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
continue
}
// 只处理目标账户的事件
if event.AccountID != accountID {
continue
}
// 应用事件到状态
switch event.Type {
case "deposit":
state.Balance += event.Amount
case "withdrawal":
state.Balance -= event.Amount
}
state.Version++
log.Printf("Applied event: %s %s %.2f (balance: %.2f)",
event.Type, event.AccountID, event.Amount, state.Balance)
}
return state, nil
}
func main() {
ctx := context.Background()
brokers := []string{"localhost:9092"}
store := NewEventStore(brokers)
// 记录一系列账户事件
events := []AccountEvent{
{EventID: "evt-1", AccountID: "acc-001", Type: "deposit", Amount: 1000.00, Timestamp: time.Now()},
{EventID: "evt-2", AccountID: "acc-001", Type: "withdrawal", Amount: 200.00, Timestamp: time.Now()},
{EventID: "evt-3", AccountID: "acc-001", Type: "deposit", Amount: 500.00, Timestamp: time.Now()},
{EventID: "evt-4", AccountID: "acc-001", Type: "withdrawal", Amount: 100.00, Timestamp: time.Now()},
}
for _, event := range events {
if err := store.Append(ctx, event); err != nil {
log.Printf("Failed to append event: %v", err)
}
}
// 等待消息写入完成
time.Sleep(2 * time.Second)
// 重建账户状态
state, err := store.RebuildState(ctx, brokers, "acc-001")
if err != nil {
log.Fatal(err)
}
fmt.Printf("\nRebuilt state: %+v\n", state)
// 输出:Balance: 1200.00, Version: 4
// (1000 - 200 + 500 - 100 = 1200)
}
RabbitMQ:灵活的消息中间件
如果说 Kafka 是一个高性能的日志流平台,那么 RabbitMQ 就是一个功能丰富的消息路由引擎。RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议,提供了灵活的消息路由、多种交换机类型和完善的消息确认机制。
AMQP 协议核心概念
┌──────────────────────────────┐
│ RabbitMQ Broker │
│ │
Producer ──消息──> Exchange ──路由──> Queue A ──> Consumer 1│
│ Queue B ──> Consumer 2│
│ │
Binding Key Ack/Nack
└──────────────────────────────┘
- Exchange(交换机):接收消息并根据规则路由到队列
- Queue(队列):存储消息直到被消费者消费
- Binding(绑定):Exchange 和 Queue 之间的关联规则
- Routing Key(路由键):消息携带的路由标识
RabbitMQ 的四种交换机
Direct Exchange: routing_key = "order.pay" ──> 精确匹配绑定键
Fanout Exchange: 忽略 routing_key ──> 广播到所有绑定的队列
Topic Exchange: routing_key = "order.*" ──> 通配符匹配
Headers Exchange: 根据消息头属性路由(很少使用)
安装和启动 RabbitMQ
# Docker 启动(带管理界面)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# 管理界面:http://localhost:15672
# 用户名/密码:guest/guest
RabbitMQ 生产者
Go 中常用的 RabbitMQ 客户端是 amqp091-go(官方维护的 AMQP 客户端)。
go get github.com/rabbitmq/amqp091-go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitProducer RabbitMQ 生产者
type RabbitProducer struct {
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitProducer(url string) (*RabbitProducer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
return &RabbitProducer{conn: conn, channel: ch}, nil
}
// DeclareExchange 声明交换机
func (p *RabbitProducer) DeclareExchange(name, kind string) error {
return p.channel.ExchangeDeclare(
name, // 交换机名称
kind, // 类型:direct, fanout, topic, headers
true, // durable: 持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}
// Publish 发送消息
func (p *RabbitProducer) Publish(ctx context.Context, exchange, routingKey string, body interface{}) error {
data, err := json.Marshal(body)
if err != nil {
return err
}
return p.channel.PublishWithContext(ctx,
exchange, // 交换机
routingKey, // 路由键
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
DeliveryMode: amqp.Persistent, // 持久化消息
Timestamp: time.Now(),
MessageId: fmt.Sprintf("msg-%d", time.Now().UnixNano()),
Headers: amqp.Table{
"producer": "order-service",
"retry-count": int32(0),
},
},
)
}
func (p *RabbitProducer) Close() {
p.channel.Close()
p.conn.Close()
}
func main() {
producer, err := NewRabbitProducer("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
// 声明交换机
producer.DeclareExchange("orders", "topic")
// 发送不同类型的订单事件
events := []struct {
RoutingKey string
Body map[string]interface{}
}{
{
RoutingKey: "order.created",
Body: map[string]interface{}{"order_id": "ORD-001", "amount": 99.9},
},
{
RoutingKey: "order.paid",
Body: map[string]interface{}{"order_id": "ORD-001", "payment_id": "PAY-001"},
},
{
RoutingKey: "order.shipped",
Body: map[string]interface{}{"order_id": "ORD-001", "tracking": "SF1234567"},
},
}
for _, event := range events {
if err := producer.Publish(ctx, "orders", event.RoutingKey, event.Body); err != nil {
log.Printf("Failed to publish: %v", err)
} else {
log.Printf("📨 Published: %s", event.RoutingKey)
}
}
}
RabbitMQ 消费者
package main
import (
"encoding/json"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitConsumer RabbitMQ 消费者
type RabbitConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitConsumer(url string) (*RabbitConsumer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, err
}
// 设置预取数量:一次只给消费者一条未确认的消息
ch.Qos(1, 0, false)
return &RabbitConsumer{conn: conn, channel: ch}, nil
}
// Consume 开始消费
func (c *RabbitConsumer) Consume(queueName string, handler func([]byte) error) error {
msgs, err := c.channel.Consume(
queueName, // 队列名
"", // 消费者标签
false, // auto-ack: 手动确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
for msg := range msgs {
log.Printf("📬 Received message from %s: %s", queueName, msg.RoutingKey)
if err := handler(msg.Body); err != nil {
log.Printf("❌ Processing failed: %v", err)
// 拒绝消息,重新入队
msg.Nack(false, true)
continue
}
// 确认消息
msg.Ack(false)
}
return nil
}
func (c *RabbitConsumer) Close() {
c.channel.Close()
c.conn.Close()
}
func main() {
consumer, err := NewRabbitConsumer("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// 声明队列并绑定到交换机
queue, err := consumer.channel.QueueDeclare(
"inventory-queue", // 队列名
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil,
)
if err != nil {
log.Fatal(err)
}
// 绑定到 orders 交换机,只接收 order.created 事件
consumer.channel.QueueBind(
queue.Name,
"order.created", // 只接收订单创建事件
"orders", // 交换机
false,
nil,
)
log.Println("Inventory consumer waiting for messages...")
consumer.Consume(queue.Name, func(body []byte) error {
var event map[string]interface{}
json.Unmarshal(body, &event)
log.Printf("📦 Deducting stock for order: %v", event["order_id"])
time.Sleep(100 * time.Millisecond) // 模拟处理
log.Printf("✅ Stock deducted successfully")
return nil
})
}
消息模式实战
工作队列模式(Work Queues)
多个 Worker 竞争消费同一个队列,实现任务的分布式处理:
package main
import (
"fmt"
"log"
"math/rand"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func startWorker(id int, url string) {
conn, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// 公平调度:一次只分发一条消息
ch.Qos(1, 0, false)
msgs, err := ch.Consume("task-queue", "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
log.Printf("Worker %d started", id)
for msg := range msgs {
task := string(msg.Body)
log.Printf("Worker %d processing: %s", id, task)
// 模拟不同任务的处理时间
duration := time.Duration(rand.Intn(5)+1) * time.Second
time.Sleep(duration)
log.Printf("Worker %d done: %s (took %v)", id, task, duration)
msg.Ack(false)
}
}
func main() {
url := "amqp://guest:guest@localhost:5672/"
if len(os.Args) > 1 && os.Args[1] == "producer" {
// 生产者:发送任务
conn, _ := amqp.Dial(url)
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
ch.QueueDeclare("task-queue", true, false, false, false, nil)
for i := 1; i <= 20; i++ {
task := fmt.Sprintf("Task #%d", i)
ch.PublishWithContext(context.Background(), "", "task-queue", false, false,
amqp.Publishing{
Body: []byte(task),
DeliveryMode: amqp.Persistent,
})
log.Printf("Sent: %s", task)
time.Sleep(500 * time.Millisecond)
}
} else {
// 消费者:作为 Worker 运行
workerID := rand.Intn(100)
startWorker(workerID, url)
}
}
发布/订阅模式(Pub/Sub)
使用 Fanout 交换机将消息广播给所有订阅者:
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// Publisher 发布者
func publishNews(url string) {
conn, _ := amqp.Dial(url)
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// 声明 Fanout 交换机
ch.ExchangeDeclare("news", "fanout", true, false, false, false, nil)
headlines := []string{
"Breaking: Go 1.23 released with major improvements",
"Tech: Kubernetes 2.0 announces new architecture",
"Sports: World Cup 2026 venues confirmed",
}
for _, headline := range headlines {
ch.PublishWithContext(context.Background(), "news", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(headline),
})
log.Printf("📰 Published: %s", headline)
time.Sleep(time.Second)
}
}
// Subscriber 订阅者
func subscribeNews(url, name string) {
conn, _ := amqp.Dial(url)
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
ch.ExchangeDeclare("news", "fanout", true, false, false, false, nil)
// 每个订阅者创建自己的独占队列
q, _ := ch.QueueDeclare("", false, true, true, false, nil)
// 绑定到 news 交换机
ch.QueueBind(q.Name, "", "news", false, nil)
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)
log.Printf("[%s] Subscribed to news...", name)
for msg := range msgs {
log.Printf("[%s] 📰 Received: %s", name, msg.Body)
}
}
func main() {
url := "amqp://guest:guest@localhost:5672/"
// 启动多个订阅者(在不同终端运行)
go subscribeNews(url, "Email Service")
go subscribeNews(url, "Push Notification")
go subscribeNews(url, "SMS Service")
time.Sleep(2 * time.Second)
// 发布新闻
publishNews(url)
select {} // 保持运行
}
Topic 路由模式
使用通配符实现灵活的消息路由:
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
url := "amqp://guest:guest@localhost:5672/"
conn, _ := amqp.Dial(url)
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// Topic 交换机
ch.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
// 创建不同日志级别的队列
bindings := []struct {
Queue string
RoutingKey string
Desc string
}{
{"all-logs", "#", "接收所有日志"}, // # 匹配任意
{"error-logs", "*.error", "只接收错误日志"}, // *.error 匹配 app.error, db.error
{"app-logs", "app.#", "接收应用相关的所有日志"}, // app.# 匹配 app.error, app.info 等
}
for _, b := range bindings {
q, _ := ch.QueueDeclare(b.Queue, true, false, false, false, nil)
ch.QueueBind(q.Name, b.RoutingKey, "logs", false, nil)
log.Printf("📋 Queue %q bound to %q (%s)", b.Queue, b.RoutingKey, b.Desc)
}
// 发送不同级别的日志
logs := []struct {
RoutingKey string
Message string
}{
{"app.info", "Application started"},
{"app.error", "Database connection failed"},
{"db.warning", "Slow query detected"},
{"db.error", "Query timeout"},
{"auth.info", "User logged in"},
{"auth.error", "Invalid token"},
}
for _, l := range logs {
ch.PublishWithContext(context.Background(), "logs", l.RoutingKey, false, false,
amqp.Publishing{Body: []byte(l.Message)})
log.Printf("📨 [%s] %s", l.RoutingKey, l.Message)
}
time.Sleep(time.Second)
// 检查各队列的消息数
for _, name := range []string{"all-logs", "error-logs", "app-logs"} {
q, _ := ch.QueueInspect(name)
log.Printf("📊 Queue %q: %d messages", name, q.Messages)
}
// all-logs: 6 messages (所有)
// error-logs: 2 messages (*.error)
// app-logs: 3 messages (app.#)
}
错误处理与重试策略
在生产环境中,消息消费失败是家常便饭。网络抖动、数据库超时、第三方 API 限流……你需要一套健壮的错误处理和重试机制。
指数退避重试
package retry
import (
"context"
"fmt"
"log"
"math"
"time"
)
type Config struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
var DefaultConfig = Config{
MaxAttempts: 5,
InitialDelay: time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
}
func WithExponentialBackoff(ctx context.Context, cfg Config, fn func() error) error {
var lastErr error
delay := cfg.InitialDelay
for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ {
lastErr = fn()
if lastErr == nil {
if attempt > 1 {
log.Printf("✅ Succeeded on attempt %d", attempt)
}
return nil
}
log.Printf("⚠️ Attempt %d/%d failed: %v", attempt, cfg.MaxAttempts, lastErr)
if attempt == cfg.MaxAttempts {
break
}
// 指数退避:delay * multiplier^(attempt-1)
nextDelay := time.Duration(float64(delay) * math.Pow(cfg.Multiplier, float64(attempt-1)))
if nextDelay > cfg.MaxDelay {
nextDelay = cfg.MaxDelay
}
log.Printf("⏳ Retrying in %v...", nextDelay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(nextDelay):
}
}
return fmt.Errorf("all %d attempts failed, last error: %w", cfg.MaxAttempts, lastErr)
}
死信队列(Dead Letter Queue)
当消息多次重试都失败后,不应该无限循环重试,而应该发送到死信队列,等待人工处理。
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func setupDeadLetterQueue(ch *amqp.Channel) error {
// 1. 创建死信交换机和队列
ch.ExchangeDeclare("dlx", "direct", true, false, false, false, nil)
dlq, err := ch.QueueDeclare("dead-letter-queue", true, false, false, false, nil)
if err != nil {
return err
}
ch.QueueBind(dlq.Name, "dead", "dlx", false, nil)
// 2. 创建主队列,配置死信路由
_, err = ch.QueueDeclare("order-queue", true, false, false, false, amqp.Table{
"x-dead-letter-exchange": "dlx", // 死信交换机
"x-dead-letter-routing-key": "dead", // 死信路由键
"x-message-ttl": 60000, // 消息过期时间 60秒
"x-max-length": 10000, // 队列最大长度
})
if err != nil {
return err
}
ch.QueueBind("order-queue", "order", "orders", false, nil)
log.Println("✅ Dead letter queue setup complete")
return nil
}
// DLQMessage 死信消息(包含原始消息和错误信息)
type DLQMessage struct {
OriginalBody string `json:"original_body"`
Error string `json:"error"`
RetryCount int `json:"retry_count"`
FailedAt time.Time `json:"failed_at"`
Queue string `json:"queue"`
RoutingKey string `json:"routing_key"`
}
func processWithRetry(ch *amqp.Channel) {
msgs, _ := ch.Consume("order-queue", "", false, false, false, false, nil)
for msg := range msgs {
// 获取重试次数(从消息头中)
retryCount := 0
if count, ok := msg.Headers["x-retry-count"]; ok {
retryCount = int(count.(int32))
}
err := processOrder(msg.Body)
if err != nil {
if retryCount < 3 {
// 重试:重新发布消息,增加重试计数
log.Printf("⚠️ Retry %d/3 for message: %v", retryCount+1, err)
ch.PublishWithContext(context.Background(),
"", "order-queue", false, false,
amqp.Publishing{
Body: msg.Body,
Headers: amqp.Table{
"x-retry-count": int32(retryCount + 1),
},
DeliveryMode: amqp.Persistent,
})
msg.Ack(false)
} else {
// 超过最大重试次数:拒绝消息,自动进入死信队列
log.Printf("❌ Max retries exceeded, sending to DLQ: %v", err)
msg.Nack(false, false) // requeue=false,消息进入 DLQ
}
continue
}
msg.Ack(false)
}
}
// DLQConsumer 死信队列消费者(人工处理或告警)
func consumeDLQ(ch *amqp.Channel) {
msgs, _ := ch.Consume("dead-letter-queue", "", false, false, false, false, nil)
for msg := range msgs {
log.Printf("🚨 Dead letter received: %s", msg.Body)
// 发送告警通知
// sendAlert(msg.Body)
// 保存到数据库供人工处理
// saveFailedMessage(msg.Body)
msg.Ack(false)
}
}
func processOrder(body []byte) error {
// 模拟处理,有一定概率失败
var order map[string]interface{}
json.Unmarshal(body, &order)
// 模拟错误
if order["amount"] == nil {
return fmt.Errorf("missing required field: amount")
}
log.Printf("✅ Order processed: %v", order["order_id"])
return nil
}
func main() {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
setupDeadLetterQueue(ch)
// 启动主消费者和死信消费者
go processWithRetry(ch)
go consumeDLQ(ch)
select {}
}
消息可靠性:确保不丢不重
在生产环境中,消息不能丢,也不能重复处理。这需要从三个层面保证。
1. 发送端确认(Publisher Confirms)
func publishWithConfirm(ch *amqp.Channel, queue string, body []byte) error {
// 开启确认模式
if err := ch.Confirm(false); err != nil {
return err
}
// 注册确认回调
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
err := ch.PublishWithContext(context.Background(), "", queue, false, false,
amqp.Publishing{
Body: body,
DeliveryMode: amqp.Persistent,
})
if err != nil {
return err
}
// 等待 Broker 确认
select {
case confirmed := <-confirms:
if confirmed.Ack {
log.Printf("✅ Message confirmed by broker")
} else {
return fmt.Errorf("message nacked by broker")
}
case <-time.After(5 * time.Second):
return fmt.Errorf("confirmation timeout")
}
return nil
}
2. 消费端手动确认
// 手动确认:处理完成后才确认
msgs, _ := ch.Consume("queue", "", false, false, false, false, nil)
for msg := range msgs {
if err := process(msg.Body); err != nil {
msg.Nack(false, true) // 处理失败,重新入队
} else {
msg.Ack(false) // 处理成功,确认
}
}
3. 幂等性处理
即使消息被重复投递,处理结果也应该一致:
package main
import (
"context"
"database/sql"
"log"
_ "github.com/lib/pq"
)
type IdempotentProcessor struct {
db *sql.DB
}
func NewIdempotentProcessor(db *sql.DB) *IdempotentProcessor {
// 创建幂等性记录表
db.Exec(`
CREATE TABLE IF NOT EXISTS processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMP DEFAULT NOW()
)
`)
return &IdempotentProcessor{db: db}
}
func (p *IdempotentProcessor) Process(ctx context.Context, messageID string, fn func() error) error {
// 检查是否已处理
var exists bool
err := p.db.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM processed_messages WHERE message_id = $1)",
messageID).Scan(&exists)
if err != nil {
return err
}
if exists {
log.Printf("⏭️ Message %s already processed, skipping", messageID)
return nil
}
// 执行处理
if err := fn(); err != nil {
return err
}
// 记录已处理
_, err = p.db.ExecContext(ctx,
"INSERT INTO processed_messages (message_id) VALUES ($1)",
messageID)
if err != nil {
log.Printf("⚠️ Failed to record message processing: %v", err)
}
return nil
}
Kafka vs RabbitMQ:如何选择?
这是一个经常被问到的问题。让我给你一个实用的决策框架:
选择 Kafka 的场景:
- 需要消息持久化和重放能力(事件溯源、数据分析)
- 超高吞吐量需求(每秒百万级消息)
- 消息需要被多个系统独立消费
- 日志收集和事件流处理
- 需要保留消息历史(按时间或容量)
选择 RabbitMQ 的场景:
- 需要灵活的消息路由(Topic、Direct、Fanout)
- 低延迟需求(微秒级)
- 任务队列和工作分发
- 需要消息优先级
- 需要延迟消息(延迟队列)
- 复杂的消息确认和死信处理
两者都用的场景:
很多公司同时使用两者。Kafka 作为事件总线处理数据流,RabbitMQ 作为任务队列处理业务逻辑。例如:
用户下单 → Kafka (事件流) → 数据分析平台
→ 推荐系统
→ 审计日志
用户下单 → RabbitMQ (任务队列) → 库存扣减 Worker
→ 支付处理 Worker
→ 邮件通知 Worker
总结
今天我们深入探讨了 Go 语言中的消息队列实战:
- 消息队列概念:异步处理、流量削峰、系统解耦
- Kafka 生产者:批量发送、消息压缩、分区策略
- Kafka 消费者:消费者组、偏移量管理、手动提交
- Kafka 事件溯源:通过重放事件重建系统状态
- RabbitMQ 生产者:AMQP 协议、交换机声明、持久化消息
- RabbitMQ 消费者:手动确认、预取控制、公平调度
- 消息模式:工作队列、发布/订阅、Topic 路由
- 错误处理:指数退避重试、死信队列
- 消息可靠性:发送确认、消费确认、幂等性处理
消息队列是分布式系统的血管,选对工具、用好模式,你的系统才能在流量洪峰面前从容不迫。
现在就去试试吧!启动一个 Docker 容器,写一个 Producer 和 Consumer,感受消息在系统间流动的美妙。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。