引言
API网关是后端服务的入口,也是保护系统的第一道防线。在高并发场景下,合理的限流和熔断策略能够防止系统过载,保证核心服务的可用性。本文将深入讲解网关层面的限流与熔断实战方案。
限流策略设计
多维度限流
限流维度:
┌─────────────────────────────────────────┐
│ 1. IP限流:防止恶意爬虫、DDoS攻击 │
│ └─ 单IP:100次/分钟 │
│ │
│ 2. 用户限流:防止单个用户滥用 │
│ └─ 普通用户:1000次/小时 │
│ └─ VIP用户:5000次/小时 │
│ │
│ 3. 接口限流:保护特定服务 │
│ └─ 登录接口:10次/分钟 │
│ └─ 查询接口:100次/秒 │
│ └─ 写入接口:50次/秒 │
│ │
│ 4. 全局限流:系统整体保护 │
│ └─ 总QPS:10000 │
└─────────────────────────────────────────┘
限流算法对比
| 算法 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | 固定时间窗口计数 | 实现简单 | 边界突发 | 精度要求不高 |
| 滑动窗口 | 滑动时间窗口 | 平滑 | 实现复杂 | 需要精确控制 |
| 令牌桶 | 固定速率生成令牌 | 允许突发 | 内存占用 | 允许一定突发 |
| 漏桶 | 固定速率处理 | 严格限速 | 无法突发 | 严格限速场景 |
Redis分布式限流
令牌桶算法实现
package ratelimit
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type TokenBucketLimiter struct {
rdb *redis.Client
rate int // 令牌生成速率(个/秒)
capacity int // 桶容量
keyPrefix string
}
func NewTokenBucketLimiter(rdb *redis.Client, rate, capacity int, keyPrefix string) *TokenBucketLimiter {
return &TokenBucketLimiter{
rdb: rdb,
rate: rate,
capacity: capacity,
keyPrefix: keyPrefix,
}
}
// 允许请求(消耗tokens个令牌)
func (l *TokenBucketLimiter) Allow(ctx context.Context, key string, tokens int) (bool, error) {
bucketKey := fmt.Sprintf("%s:%s", l.keyPrefix, key)
// Lua脚本保证原子性
script := redis.NewScript(`
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(bucket[1])
local last_time = tonumber(bucket[2])
-- 初始化
if tokens == nil then
tokens = capacity
last_time = now
end
-- 计算新增令牌
local delta_time = now - last_time
local new_tokens = delta_time * rate
tokens = math.min(capacity, tokens + new_tokens)
-- 尝试消耗令牌
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, 60) -- 60秒过期
return 1 -- 允许
else
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, 60)
return 0 -- 拒绝
end
`)
now := time.Now().Unix()
result, err := script.Run(ctx, l.rdb, []string{bucketKey},
l.capacity, l.rate, tokens, now).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
// 使用示例:多维度限流
type MultiDimensionLimiter struct {
ipLimiter *TokenBucketLimiter
userLimiter *TokenBucketLimiter
apiLimiter *TokenBucketLimiter
globalLimiter *TokenBucketLimiter
}
func NewMultiDimensionLimiter(rdb *redis.Client) *MultiDimensionLimiter {
return &MultiDimensionLimiter{
ipLimiter: NewTokenBucketLimiter(rdb, 100, 200, "rl:ip"), // 100个/秒,容量200
userLimiter: NewTokenBucketLimiter(rdb, 50, 100, "rl:user"), // 50个/秒,容量100
apiLimiter: NewTokenBucketLimiter(rdb, 200, 500, "rl:api"), // 200个/秒,容量500
globalLimiter: NewTokenBucketLimiter(rdb, 10000, 20000, "rl:global"), // 10000个/秒
}
}
func (l *MultiDimensionLimiter) Check(ctx context.Context, ip, userID, apiPath string) (bool, string) {
// 1. IP限流
if allowed, _ := l.ipLimiter.Allow(ctx, ip, 1); !allowed {
return false, "IP rate limit exceeded"
}
// 2. 用户限流
if userID != "" {
if allowed, _ := l.userLimiter.Allow(ctx, userID, 1); !allowed {
return false, "User rate limit exceeded"
}
}
// 3. 接口限流
if allowed, _ := l.apiLimiter.Allow(ctx, apiPath, 1); !allowed {
return false, "API rate limit exceeded"
}
// 4. 全局限流
if allowed, _ := l.globalLimiter.Allow(ctx, "global", 1); !allowed {
return false, "Global rate limit exceeded"
}
return true, ""
}
滑动窗口实现
type SlidingWindowLimiter struct {
rdb *redis.Client
window time.Duration
limit int
keyPrefix string
}
func NewSlidingWindowLimiter(rdb *redis.Client, window time.Duration, limit int, keyPrefix string) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
rdb: rdb,
window: window,
limit: limit,
keyPrefix: keyPrefix,
}
}
func (l *SlidingWindowLimiter) Allow(ctx context.Context, key string) (bool, error) {
windowKey := fmt.Sprintf("%s:%s", l.keyPrefix, key)
now := time.Now().UnixMilli()
windowStart := now - l.window.Milliseconds()
// Lua脚本
script := redis.NewScript(`
local key = KEYS[1]
local window_start = tonumber(ARGV[1])
local now = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
-- 移除窗口外的请求
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- 统计当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 添加当前请求
redis.call('ZADD', key, now, now .. math.random())
redis.call('PEXPIRE', key, 60000)
return 1
else
return 0
end
`)
result, err := script.Run(ctx, l.rdb, []string{windowKey},
windowStart, now, l.limit).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
Kong网关限流配置
Rate Limiting插件
# kong-rate-limit.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: rate-limit-plugin
config:
minute: 100
hour: 1000
limit_by: ip
policy: redis
redis_host: redis-master
redis_port: 6379
fault_tolerant: true
hide_client_headers: false
plugin: rate-limiting
---
# 应用到特定路由
apiVersion: configuration.konghq.com/v1
kind: KongIngress
metadata:
name: api-ingress
route:
protocols:
- https
methods:
- GET
- POST
plugins:
- rate-limit-plugin
Request Termination(熔断降级)
# kong-request-termination.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: circuit-breaker
config:
status_code: 503
message: "Service temporarily unavailable, please try again later"
content_type: application/json
plugin: request-termination
---
# 条件触发:当后端服务错误率超过阈值时启用
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: global-circuit-breaker
config:
status_code: 503
message: "System under maintenance"
plugin: request-termination
# 通过Kong Manager或API动态启用/禁用
熔断器实现
状态机设计
熔断器状态机:
┌──────────┐ 失败率>阈值 ┌──────────┐
│ CLOSED │ ───────────────▶ │ OPEN │
│ (正常) │ │ (熔断) │
└──────────┘ └──────────┘
▲ │
│ │ 超时时间到
│ ▼
│ ┌──────────┐
└──────────────────────── │HALF-OPEN │
成功 │ (半开) │
└──────────┘
│
│ 失败
└─────▶ OPEN
Go熔断器实现
package circuitbreaker
import (
"context"
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
type CircuitBreaker struct {
name string
failureThreshold int // 失败阈值
successThreshold int // 半开状态成功阈值
timeout time.Duration // 熔断持续时间
mu sync.RWMutex
state State
failureCount int
successCount int
lastStateTime time.Time
}
func NewCircuitBreaker(name string, failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
name: name,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
state: StateClosed,
lastStateTime: time.Now(),
}
}
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
if !cb.allowRequest() {
return errors.New("circuit breaker is open")
}
err := fn()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// 检查是否到达半开时间
if time.Since(cb.lastStateTime) > cb.timeout {
cb.mu.RUnlock()
cb.mu.Lock()
cb.transitionTo(StateHalfOpen)
cb.mu.Unlock()
cb.mu.RLock()
return true
}
return false
case StateHalfOpen:
return true
}
return false
}
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.onFailure()
} else {
cb.onSuccess()
}
}
func (cb *CircuitBreaker) onFailure() {
switch cb.state {
case StateClosed:
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.transitionTo(StateOpen)
}
case StateHalfOpen:
// 半开状态失败,重新熔断
cb.transitionTo(StateOpen)
}
}
func (cb *CircuitBreaker) onSuccess() {
switch cb.state {
case StateClosed:
cb.failureCount = 0
case StateHalfOpen:
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.transitionTo(StateClosed)
}
}
}
func (cb *CircuitBreaker) transitionTo(state State) {
cb.state = state
cb.lastStateTime = time.Now()
cb.failureCount = 0
cb.successCount = 0
}
// 获取当前状态
func (cb *CircuitBreaker) GetState() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
Nginx熔断配置
# nginx.conf
upstream backend {
server backend1.example.com:8080 max_fails=3 fail_timeout=30s;
server backend2.example.com:8080 max_fails=3 fail_timeout=30s;
server backend3.example.com:8080 backup; # 降级服务器
}
server {
listen 80;
location /api/ {
proxy_pass http://backend;
# 超时配置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 错误处理
proxy_intercept_errors on;
error_page 500 502 503 504 /50x.html;
# 降级页面
location = /50x.html {
internal;
return 503 '{"error": "Service temporarily unavailable", "retry_after": 30}';
add_header Content-Type application/json;
}
}
# 健康检查(Nginx Plus)
location /health {
health_check type=http;
health_check_port 8080;
health_check_uri=/health;
health_check_interval 5s;
health_check_fails 3;
health_check_passes 2;
}
}
服务降级策略
降级方案设计
降级策略:
1. 返回缓存数据
└─ 场景:读接口、非实时数据
└─ 实现:Redis缓存 + 过期时间延长
2. 返回默认值
└─ 场景:推荐、广告等非核心功能
└─ 实现:配置默认响应
3. 简化功能
└─ 场景:复杂查询、聚合接口
└─ 实现:关闭部分功能,返回基础数据
4. 异步处理
└─ 场景:写入操作
└─ 实现:写入MQ,异步处理
降级实现示例
package degradation
import (
"context"
"encoding/json"
"time"
)
type DegradationStrategy interface {
Execute(ctx context.Context) (interface{}, error)
}
// 缓存降级
type CacheDegradation struct {
cache Cache
key string
maxAge time.Duration
}
func (d *CacheDegradation) Execute(ctx context.Context) (interface{}, error) {
data, err := d.cache.Get(ctx, d.key)
if err != nil {
return nil, err
}
// 检查缓存是否过期(可以放宽过期时间)
if time.Since(data.UpdatedAt) > d.maxAge {
log.Warn("Using stale cache data")
}
return data.Value, nil
}
// 默认值降级
type DefaultValueDegradation struct {
value interface{}
}
func (d *DefaultValueDegradation) Execute(ctx context.Context) (interface{}, error) {
return d.value, nil
}
// 带降级的服务调用
type ServiceWithDegradation struct {
primary func(ctx context.Context) (interface{}, error)
strategies []DegradationStrategy
}
func (s *ServiceWithDegradation) Call(ctx context.Context) (interface{}, error) {
// 尝试主服务
result, err := s.primary(ctx)
if err == nil {
return result, nil
}
log.Errorf("Primary service failed: %v, trying degradation", err)
// 尝试降级策略
for _, strategy := range s.strategies {
result, err := strategy.Execute(ctx)
if err == nil {
return result, nil
}
}
return nil, errors.New("all degradation strategies failed")
}
// 使用示例
func getUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
service := &ServiceWithDegradation{
primary: func(ctx context.Context) (interface{}, error) {
return callUserService(ctx, userID)
},
strategies: []DegradationStrategy{
// 策略1:缓存
&CacheDegradation{
cache: redisCache,
key: "user:" + userID,
maxAge: 24 * time.Hour, // 允许使用24小时内的缓存
},
// 策略2:默认值
&DefaultValueDegradation{
value: &UserProfile{
ID: userID,
Name: "Unknown",
Avatar: "/default-avatar.png",
IsDegraded: true,
},
},
},
}
result, err := service.Call(ctx)
if err != nil {
return nil, err
}
return result.(*UserProfile), nil
}
监控与告警
Prometheus指标
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
// 限流指标
RateLimitTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rate_limit_total",
Help: "Total number of rate limit checks",
},
[]string{"type", "result"}, // type: ip/user/api, result: allowed/rejected
)
// 熔断器指标
CircuitBreakerState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "circuit_breaker_state",
Help: "Current state of circuit breaker (0=closed, 1=half-open, 2=open)",
},
[]string{"name"},
)
CircuitBreakerRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "circuit_breaker_requests_total",
Help: "Total number of requests through circuit breaker",
},
[]string{"name", "result"},
)
// 降级指标
DegradationTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "degradation_total",
Help: "Total number of degradation triggers",
},
[]string{"service", "strategy"},
)
)
func init() {
prometheus.MustRegister(RateLimitTotal)
prometheus.MustRegister(CircuitBreakerState)
prometheus.MustRegister(CircuitBreakerRequests)
prometheus.MustRegister(DegradationTotal)
}
告警规则
# prometheus-alerts.yaml
groups:
- name: protection
rules:
# 限流告警
- alert: HighRateLimitRejection
expr: |
sum(rate(rate_limit_total{result="rejected"}[5m])) by (type)
/
sum(rate(rate_limit_total[5m])) by (type)
> 0.3
for: 2m
labels:
severity: warning
annotations:
summary: "High rate limit rejection rate"
description: "{{ $labels.type }} rejection rate is {{ $value | humanizePercentage }}"
# 熔断器告警
- alert: CircuitBreakerOpen
expr: circuit_breaker_state == 2
for: 1m
labels:
severity: critical
annotations:
summary: "Circuit breaker is open"
description: "Circuit breaker {{ $labels.name }} has been open for more than 1 minute"
# 降级告警
- alert: HighDegradationRate
expr: |
sum(rate(degradation_total[5m])) by (service) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High degradation rate"
description: "Service {{ $labels.service }} is being degraded frequently"
总结
限流与熔断最佳实践
| 场景 | 限流策略 | 熔断策略 | 降级策略 |
|---|---|---|---|
| 登录接口 | IP+用户双维度,严格限制 | 快速失败 | 返回错误提示 |
| 查询接口 | 宽松限流,允许突发 | 超时熔断 | 返回缓存 |
| 写入接口 | 严格限流 | 熔断后异步 | 写入MQ异步处理 |
| 核心接口 | 多维度保护 | 半开探测 | 简化功能 |
| 非核心接口 | 可关闭 | 直接熔断 | 返回默认值 |
关键原则
- 多层防护:网关限流 + 服务熔断 + 应用降级
- 精细化控制:按IP、用户、接口多维度限流
- 快速失败:熔断器打开后立即拒绝请求
- 优雅降级:保证核心功能可用
- 监控告警:实时监期限流、熔断、降级情况
- 动态调整:根据流量动态调整限流阈值
延伸阅读
- Kong Rate Limiting Plugin
- Nginx Rate Limiting
- Redis Rate Limiting Patterns
- Martin Fowler - Circuit Breaker
- Resilience4j Circuit Breaker
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。