引言
消息队列是现代分布式系统的核心组件,但简单的生产-消费模式往往无法满足复杂的业务需求。死信队列、延迟消息、重试机制等高级特性,让消息队列能够应对更复杂的场景,如订单超时、任务调度、失败重试等。
死信队列(Dead Letter Queue)
什么是死信队列
死信队列用于存储无法正常处理的消息,常见场景:
- 消息被拒绝(nack/reject)且不重新入队
- 消息过期(TTL超时)
- 队列达到最大长度
正常消息流:
Producer → Exchange → Queue → Consumer
↓ (失败)
死信消息流: DLX Exchange → DLQ → 人工处理/重试
RabbitMQ死信队列配置
# 声明死信交换机和队列
rabbitmqadmin declare exchange name=dlx.exchange type=direct durable=true
rabbitmqadmin declare queue name=dead.letter.queue durable=true
rabbitmqadmin declare binding source=dlx.exchange destination=dead.letter.queue routing_key=dead
# 声明业务队列,配置死信交换机
rabbitmqadmin declare queue name=order.queue durable=true arguments='{
"x-dead-letter-exchange": "dlx.exchange",
"x-dead-letter-routing-key": "dead",
"x-message-ttl": 60000
}'
Go代码示例
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// 声明死信队列
func declareDeadLetterQueue(ch *amqp.Channel) error {
// 死信交换机
err := ch.ExchangeDeclare(
"dlx.exchange", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// 死信队列
_, err = ch.QueueDeclare(
"dead.letter.queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// 绑定
err = ch.QueueBind(
"dead.letter.queue", // queue name
"dead", // routing key
"dlx.exchange", // exchange
false,
nil,
)
return err
}
// 声明业务队列(带死信配置)
func declareBusinessQueue(ch *amqp.Channel) error {
args := amqp.Table{
"x-dead-letter-exchange": "dlx.exchange",
"x-dead-letter-routing-key": "dead",
}
_, err := ch.QueueDeclare(
"order.queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
return err
}
// 消费者:拒绝消息并发送到死信队列
func consumeWithReject(ch *amqp.Channel) error {
msgs, err := ch.Consume(
"order.queue", // queue
"", // consumer
false, // auto-ack(必须为false才能reject)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
for msg := range msgs {
log.Printf("Received message: %s", msg.Body)
// 模拟处理失败
if shouldReject(msg.Body) {
// reject消息,requeue=false会发送到死信队列
msg.Reject(false)
log.Printf("Message rejected to DLQ: %s", msg.Body)
} else {
msg.Ack(false)
}
}
return nil
}
Kafka死信队列实现
Kafka原生不支持死信队列,需要通过应用层实现:
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/segmentio/kafka-go"
)
type DeadLetterMessage struct {
OriginalTopic string `json:"original_topic"`
OriginalMessage json.RawMessage `json:"original_message"`
Error string `json:"error"`
RetryCount int `json:"retry_count"`
Timestamp time.Time `json:"timestamp"`
}
// 消费者:失败时发送到死信Topic
func consumeWithDLQ(ctx context.Context) error {
// 业务消费者
businessReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "order.events",
GroupID: "order-consumer",
})
defer businessReader.Close()
// 死信生产者
dlqWriter := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "order.events.dlq",
Balancer: &kafka.LeastBytes{},
}
defer dlqWriter.Close()
for {
msg, err := businessReader.ReadMessage(ctx)
if err != nil {
log.Printf("Error reading message: %v", err)
continue
}
// 处理消息
err = processMessage(msg.Value)
if err != nil {
// 发送到死信队列
dlqMsg := DeadLetterMessage{
OriginalTopic: "order.events",
OriginalMessage: msg.Value,
Error: err.Error(),
RetryCount: 0,
Timestamp: time.Now(),
}
dlqBytes, _ := json.Marshal(dlqMsg)
dlqWriter.WriteMessages(ctx, kafka.Message{
Key: msg.Key,
Value: dlqBytes,
})
log.Printf("Message sent to DLQ: %s", err)
}
}
}
// 死信队列消费者(人工处理或自动重试)
func consumeDLQ(ctx context.Context) error {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "order.events.dlq",
GroupID: "dlq-processor",
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
continue
}
var dlqMsg DeadLetterMessage
json.Unmarshal(msg.Value, &dlqMsg)
// 策略1:自动重试(最多3次)
if dlqMsg.RetryCount < 3 {
err := processMessage(dlqMsg.OriginalMessage)
if err == nil {
log.Printf("Retry successful after %d attempts", dlqMsg.RetryCount+1)
continue
}
// 重试失败,增加计数后重新发送到DLQ
dlqMsg.RetryCount++
dlqBytes, _ := json.Marshal(dlqMsg)
// 发送到DLQ(代码省略)
continue
}
// 策略2:超过重试次数,告警并人工处理
log.Printf("Message failed after max retries, requires manual intervention: %s",
string(dlqMsg.OriginalMessage))
sendAlert(dlqMsg)
}
}
延迟消息
应用场景
- 订单超时未支付自动取消(30分钟)
- 延时任务调度(定时发送通知)
- 重试延迟(指数退避)
- 预约服务(提前N小时提醒)
RabbitMQ延迟消息(TTL + 死信)
package main
import (
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// 声明延迟队列(通过TTL实现)
func declareDelayQueue(ch *amqp.Channel, delayMs int) error {
args := amqp.Table{
"x-dead-letter-exchange": "delay.dlx",
"x-dead-letter-routing-key": "delay.process",
"x-message-ttl": delayMs, // 消息存活时间(毫秒)
}
queueName := "delay.queue." + time.Duration(delayMs).String()
_, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
args,
)
return err
}
// 发送延迟消息
func sendDelayedMessage(ch *amqp.Channel, body string, delayMs int) error {
queueName := "delay.queue." + time.Duration(delayMs).String()
return ch.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
}
// 使用示例:订单超时取消
func scheduleOrderTimeout(ch *amqp.Channel, orderID string) error {
// 30分钟后自动取消
return sendDelayedMessage(ch, orderID, 30*60*1000)
}
RabbitMQ延迟插件(推荐)
# 安装rabbitmq_delayed_message_exchange插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
// 使用延迟插件
func declareDelayedExchange(ch *amqp.Channel) error {
args := amqp.Table{
"x-delayed-type": "direct",
}
// 声明延迟交换机
err := ch.ExchangeDeclare(
"delayed.exchange", // name
"x-delayed-message", // type(插件提供)
true,
false,
false,
false,
args,
)
if err != nil {
return err
}
// 声明队列并绑定
_, err = ch.QueueDeclare("delayed.queue", true, false, false, false, nil)
if err != nil {
return err
}
return ch.QueueBind("delayed.queue", "delay", "delayed.exchange", false, nil)
}
// 发送延迟消息
func sendWithDelay(ch *amqp.Channel, body string, delayMs int64) error {
headers := amqp.Table{
"x-delay": delayMs, // 延迟时间(毫秒)
}
return ch.Publish(
"delayed.exchange",
"delay",
false,
false,
amqp.Publishing{
Headers: headers,
ContentType: "text/plain",
Body: []byte(body),
},
)
}
RocketMQ延迟消息
RocketMQ原生支持18个延迟级别:
// RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = new Message("OrderTopic", "order", "OrderID123", "Order content".getBytes());
msg.setDelayTimeLevel(14); // 延迟5分钟(级别14)
producer.send(msg);
Kafka延迟消息(时间轮实现)
package main
import (
"container/heap"
"context"
"sync"
"time"
)
type DelayedMessage struct {
ExecuteAt time.Time
Topic string
Key []byte
Value []byte
index int
}
type DelayedMessageHeap []*DelayedMessage
func (h DelayedMessageHeap) Len() int { return len(h) }
func (h DelayedMessageHeap) Less(i, j int) bool { return h[i].ExecuteAt.Before(h[j].ExecuteAt) }
func (h DelayedMessageHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *DelayedMessageHeap) Push(x interface{}) {
n := len(*h)
item := x.(*DelayedMessage)
item.index = n
*h = append(*h, item)
}
func (h *DelayedMessageHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
*h = old[0 : n-1]
return item
}
// 延迟消息调度器
type DelayedMessageScheduler struct {
heap DelayedMessageHeap
mu sync.Mutex
cond *sync.Cond
producer *kafka.Writer
}
func NewDelayedMessageScheduler(producer *kafka.Writer) *DelayedMessageScheduler {
s := &DelayedMessageScheduler{
producer: producer,
}
s.cond = sync.NewCond(&s.mu)
heap.Init(&s.heap)
return s
}
// 添加延迟消息
func (s *DelayedMessageScheduler) Schedule(ctx context.Context, delay time.Duration, topic string, key, value []byte) {
s.mu.Lock()
defer s.mu.Unlock()
msg := &DelayedMessage{
ExecuteAt: time.Now().Add(delay),
Topic: topic,
Key: key,
Value: value,
}
heap.Push(&s.heap, msg)
s.cond.Signal() // 唤醒调度线程
}
// 启动调度器
func (s *DelayedMessageScheduler) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
s.mu.Lock()
// 等待直到有消息或到达执行时间
for s.heap.Len() == 0 {
s.cond.Wait()
}
// 检查最早的消息
earliest := s.heap[0]
now := time.Now()
if earliest.ExecuteAt.After(now) {
// 还未到执行时间,等待
waitDuration := earliest.ExecuteAt.Sub(now)
timer := time.NewTimer(waitDuration)
s.mu.Unlock()
select {
case <-timer.C:
// 时间到,重新获取锁
s.mu.Lock()
case <-ctx.Done():
timer.Stop()
return
}
}
// 执行到期的消息
for s.heap.Len() > 0 && !s.heap[0].ExecuteAt.After(time.Now()) {
msg := heap.Pop(&s.heap).(*DelayedMessage)
s.mu.Unlock()
// 发送到目标Topic
s.producer.WriteMessages(ctx, kafka.Message{
Topic: msg.Topic,
Key: msg.Key,
Value: msg.Value,
})
s.mu.Lock()
}
s.mu.Unlock()
}
}
}()
}
消息重试机制
重试策略设计
重试策略:
1. 立即重试(适合瞬时故障)
└─ 重试间隔:0ms
└─ 适用场景:网络抖动、临时资源不足
2. 固定间隔重试
└─ 重试间隔:固定时间(如5秒)
└─ 适用场景:依赖服务重启
3. 指数退避重试(推荐)
└─ 重试间隔:1s → 2s → 4s → 8s → 16s...
└─ 适用场景:避免雪崩效应
4. 带抖动的指数退避(最佳实践)
└─ 重试间隔:base * 2^n + random_jitter
└─ 适用场景:避免多个消费者同时重试
指数退避重试实现
package main
import (
"context"
"math"
"math/rand"
"time"
)
type RetryConfig struct {
MaxRetries int // 最大重试次数
InitialDelay time.Duration // 初始延迟
MaxDelay time.Duration // 最大延迟
Multiplier float64 // 倍数
JitterFraction float64 // 抖动比例(0-1)
}
var DefaultRetryConfig = RetryConfig{
MaxRetries: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
JitterFraction: 0.1,
}
func calculateBackoff(attempt int, config RetryConfig) time.Duration {
// 指数退避:initialDelay * multiplier^attempt
delay := float64(config.InitialDelay) * math.Pow(config.Multiplier, float64(attempt))
// 限制最大延迟
if delay > float64(config.MaxDelay) {
delay = float64(config.MaxDelay)
}
// 添加抖动
jitter := delay * config.JitterFraction
delay = delay - jitter + rand.Float64()*2*jitter
return time.Duration(delay)
}
// 带重试的消息处理器
type RetryableMessageHandler struct {
config RetryConfig
handler func(ctx context.Context, msg []byte) error
}
func (h *RetryableMessageHandler) Handle(ctx context.Context, msg []byte) error {
var lastErr error
for attempt := 0; attempt <= h.config.MaxRetries; attempt++ {
err := h.handler(ctx, msg)
if err == nil {
return nil // 成功
}
lastErr = err
// 最后一次尝试不需要等待
if attempt < h.config.MaxRetries {
backoff := calculateBackoff(attempt, h.config)
log.Printf("Attempt %d failed: %v, retrying in %v", attempt+1, err, backoff)
select {
case <-time.After(backoff):
// 继续重试
case <-ctx.Done():
return ctx.Err()
}
}
}
return lastErr
}
// 使用示例
func main() {
handler := &RetryableMessageHandler{
config: DefaultRetryConfig,
handler: func(ctx context.Context, msg []byte) error {
// 业务处理逻辑
return processOrder(msg)
},
}
err := handler.Handle(context.Background(), orderData)
if err != nil {
// 所有重试都失败,发送到死信队列
sendToDLQ(orderData, err)
}
}
Kafka重试Topic设计
package main
import (
"encoding/json"
"fmt"
"time"
)
// 重试Topic命名规范:{original-topic}.retry.{attempt}
// 例如:order.events.retry.1, order.events.retry.2, order.events.retry.3
type RetryMessage struct {
OriginalTopic string `json:"original_topic"`
OriginalMessage json.RawMessage `json:"original_message"`
Attempt int `json:"attempt"`
LastError string `json:"last_error"`
NextRetryAt time.Time `json:"next_retry_at"`
}
// 重试Topic配置
var retryTopics = []struct {
Topic string
Delay time.Duration
}{
{"order.events.retry.1", 1 * time.Second},
{"order.events.retry.2", 5 * time.Second},
{"order.events.retry.3", 30 * time.Second},
{"order.events.retry.4", 2 * time.Minute},
{"order.events.retry.5", 10 * time.Minute},
}
// 消费者组:每个重试Topic一个消费者组
func consumeRetryTopics(ctx context.Context) {
for i, retry := range retryTopics {
go func(attempt int, topic string) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: fmt.Sprintf("order-consumer-retry-%d", attempt),
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
continue
}
var retryMsg RetryMessage
json.Unmarshal(msg.Value, &retryMsg)
// 处理消息
err = processMessage(retryMsg.OriginalMessage)
if err == nil {
log.Printf("Retry successful at attempt %d", attempt)
continue
}
// 重试失败,发送到下一个重试Topic或死信队列
if attempt < len(retryTopics) {
// 发送到下一个重试级别
nextRetry := retryTopics[attempt]
retryMsg.Attempt = attempt + 1
retryMsg.LastError = err.Error()
retryMsg.NextRetryAt = time.Now().Add(nextRetry.Delay)
retryBytes, _ := json.Marshal(retryMsg)
producer.WriteMessages(ctx, kafka.Message{
Topic: nextRetry.Topic,
Value: retryBytes,
})
} else {
// 超过最大重试次数,发送到死信队列
sendToDLQ(retryMsg)
}
}
}(i+1, retry.Topic)
}
}
幂等性保证
消息去重策略
package main
import (
"context"
"database/sql"
"time"
)
// 策略1:数据库唯一约束
func createMessageTable(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
payload JSONB
)
`)
return err
}
func processWithIdempotency(ctx context.Context, db *sql.DB, msgID string, payload []byte) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 插入消息ID(如果已存在会失败)
_, err = tx.ExecContext(ctx,
"INSERT INTO processed_messages (message_id, payload) VALUES ($1, $2)",
msgID, payload,
)
if err != nil {
if isDuplicateKeyError(err) {
log.Printf("Message %s already processed, skipping", msgID)
return nil // 已处理,幂等返回
}
return err
}
// 执行业务逻辑
err = doBusinessLogic(ctx, tx, payload)
if err != nil {
return err
}
return tx.Commit()
}
// 策略2:Redis分布式锁
func processWithRedisLock(ctx context.Context, rdb *redis.Client, msgID string, payload []byte) error {
// 设置24小时过期
lockKey := "processed:" + msgID
lockValue := time.Now().Format(time.RFC3339)
// 尝试获取锁
ok, err := rdb.SetNX(ctx, lockKey, lockValue, 24*time.Hour).Result()
if err != nil {
return err
}
if !ok {
log.Printf("Message %s already processed", msgID)
return nil // 已处理
}
// 执行业务逻辑
err = doBusinessLogic(ctx, nil, payload)
if err != nil {
// 处理失败,删除锁以便重试
rdb.Del(ctx, lockKey)
return err
}
return nil
}
// 策略3:业务状态机
func processWithStateMachine(ctx context.Context, db *sql.DB, orderID string) error {
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
// 查询当前状态
var status string
err := tx.QueryRowContext(ctx,
"SELECT status FROM orders WHERE id = $1 FOR UPDATE",
orderID,
).Scan(&status)
if err != nil {
return err
}
// 状态机检查:只有PENDING状态才能处理
if status != "PENDING" {
log.Printf("Order %s already in status %s, skipping", orderID, status)
return nil // 幂等
}
// 更新状态并处理
_, err = tx.ExecContext(ctx,
"UPDATE orders SET status = 'PROCESSING' WHERE id = $1",
orderID,
)
if err != nil {
return err
}
// 执行业务逻辑...
_, err = tx.ExecContext(ctx,
"UPDATE orders SET status = 'COMPLETED' WHERE id = $1",
orderID,
)
return tx.Commit()
}
总结
高级特性使用场景
| 特性 | 使用场景 | 注意事项 |
|---|---|---|
| 死信队列 | 消息处理失败、消息过期、队列满 | 需要人工介入或自动重试机制 |
| 延迟消息 | 订单超时、定时任务、重试延迟 | 注意延迟精度和可靠性 |
| 重试机制 | 临时故障、网络抖动 | 使用指数退避,避免雪崩 |
| 幂等性 | 消息重复消费 | 必须保证,使用唯一ID或状态机 |
最佳实践
死信队列:
- 始终配置死信队列,避免消息丢失
- 监控死信队列长度,及时告警
- 提供死信消息的查询和重放能力
延迟消息:
- 选择合适的实现方案(插件 vs TTL+DLX)
- 注意延迟精度要求
- 考虑延迟消息的堆积问题
重试机制:
- 使用指数退避+抖动
- 设置最大重试次数
- 区分可重试和不可重试错误
幂等性:
- 每条消息必须有唯一ID
- 使用数据库约束或分布式锁
- 业务层面实现状态机
延伸阅读
- RabbitMQ Dead Letter Exchanges
- Kafka Exactly-Once Semantics
- RocketMQ Delay Message
- Enterprise Integration Patterns
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。