限速听起来像网关和高并发系统才需要的能力,其实入门项目也经常遇到。比如你要批量同步 5000 个客户资料到第三方 CRM,对方文档写着“每秒最多 5 次请求”。如果你开 50 个 goroutine 一口气打过去,很快就会收到 429,严重时还会被封禁。
Go 标准库没有内置完整的令牌桶限速器,但 time.Ticker 足够写出一个朴素、可理解的小限速器。学习它的过程,也能帮你理解时间驱动、取消和并发控制。
最简单的节拍
每 200ms 执行一次任务,大约就是每秒 5 次:
func syncItems(ctx context.Context, items []Item) error {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for _, item := range items {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := syncOne(ctx, item); err != nil {
return err
}
}
}
return nil
}
ticker.Stop() 很重要。Ticker 内部持有资源,不停掉会造成泄漏。虽然小程序马上退出时看不明显,但服务里长期创建 ticker 就会出问题。
第一次是否要等待
上面的代码会在第一个任务前先等 200ms。很多场景希望第一个请求立刻发,后续再按节奏。可以把调用放在等待前,或者先创建一个已经可用的令牌。
func waitTick(ctx context.Context, ticker *time.Ticker) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
return nil
}
}
然后在循环里决定什么时候等。简单批处理里,等一下通常无所谓;用户点击按钮后立刻看到响应的场景,第一次延迟就会影响体验。
失败时是否继续
批处理有两种策略:遇到错误立即停止,或者记录错误继续处理下一条。同步第三方数据时,我通常会把失败记录下来,最后统一返回摘要。
type SyncError struct {
ID string
Err error
}
func syncAll(ctx context.Context, items []Item) []SyncError {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var errs []SyncError
for _, item := range items {
if err := waitTick(ctx, ticker); err != nil {
errs = append(errs, SyncError{ID: item.ID, Err: err})
break
}
if err := syncOne(ctx, item); err != nil {
errs = append(errs, SyncError{ID: item.ID, Err: err})
}
}
return errs
}
这种写法不会因为一条坏数据让整批停掉,但也不会吞掉错误。最终可以把失败 ID 写入日志、数据库或导出给人工处理。
限速和并发不是一回事
限速控制单位时间内的请求数量,并发控制同时进行的任务数量。只用 ticker 不代表没有并发问题。如果 syncOne 很慢,串行处理会耗时很久;如果你开多个 worker,每个 worker 都一个 ticker,又可能总速率超标。
一种简单方式是所有 worker 共享同一个令牌 channel:
func tokenSource(ctx context.Context, interval time.Duration) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
select {
case ch <- struct{}{}:
case <-ctx.Done():
return
}
}
}
}()
return ch
}
worker 使用同一个 tokens:
func worker(ctx context.Context, jobs <-chan Item, tokens <-chan struct{}) {
for item := range jobs {
select {
case <-ctx.Done():
return
case <-tokens:
}
_ = syncOne(ctx, item)
}
}
这样不管有多少 worker,总体发起速度都被同一个 token 源控制。
允许短暂突发
有些接口允许“平均每秒 5 次,短时间最多 10 次”。这时可以用带缓冲的 token channel。启动时先放几个令牌,就能允许短暂突发。
func burstTokens(ctx context.Context, interval time.Duration, burst int) <-chan struct{} {
ch := make(chan struct{}, burst)
for i := 0; i < burst; i++ {
ch <- struct{}{}
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(ch)
return
case <-ticker.C:
select {
case ch <- struct{}{}:
default:
}
}
}
}()
return ch
}
default 表示桶满时丢掉新令牌。这样令牌最多积累到 burst,不会因为系统空闲一小时后突然允许几万次请求。
处理 429
第三方接口返回 429 时,最好尊重 Retry-After:
func retryAfter(resp *http.Response) time.Duration {
v := resp.Header.Get("Retry-After")
if v == "" {
return time.Second
}
if n, err := strconv.Atoi(v); err == nil {
return time.Duration(n) * time.Second
}
if t, err := http.ParseTime(v); err == nil {
return time.Until(t)
}
return time.Second
}
限速器是主动控制,429 是对方告诉你已经超了。两者要一起看。遇到 429 后可以暂停一段时间、降低速率或把任务放回队列,而不是立刻重试。
测试限速逻辑
时间相关代码不好测,因为真实等待会让测试变慢。简单函数可以把 interval 设置很小:
func TestTokenSourceStops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tokens := tokenSource(ctx, time.Millisecond)
<-tokens
cancel()
for range tokens {
}
}
更复杂的限速器可以把时钟抽象出来,但入门阶段不必过度设计。先让取消、关闭和基本节奏正确。
小结
time.Ticker 能实现朴素限速:定时产生令牌,任务拿到令牌再执行。它适合批处理、第三方 API 同步、邮件发送和低复杂度后台任务。记得停止 ticker,支持 context 取消,并明确失败策略。
限速不等于并发控制。多个 worker 要共享同一个令牌源,否则总速率会失控。生产项目如果需要精确令牌桶、分布式限速或动态调速,可以再引入成熟库或网关能力。入门阶段先把最小模型写清楚,后面升级才有依据。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。