后台任务不应该都塞在 HTTP 请求里
很多 Web 服务会遇到一些不适合在请求里同步完成的事情:发送邮件、生成报表、处理图片、同步第三方数据、写审计日志。简单项目里,你可能先在 handler 里直接调用这些逻辑,但请求会变慢,也更容易受外部系统影响。
Go 的 goroutine 和 channel 很适合实现入门级后台 worker。你可以把任务放进 channel,由固定数量 worker 消费。再配合 context,就能在服务关闭时停止接收新任务,并等待已有任务结束。
这篇文章实现一个内存里的 Worker 队列。它不替代真正的消息队列,比如 Redis、RabbitMQ、Kafka;但适合理解并发任务处理的基本结构。
定义任务和队列
type Job struct {
ID int64
Email string
Body string
}
type Queue struct {
jobs chan Job
wg sync.WaitGroup
}
func NewQueue(size int) *Queue {
return &Queue{
jobs: make(chan Job, size),
}
}
提交任务:
func (q *Queue) Submit(ctx context.Context, job Job) error {
select {
case q.jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("queue is full")
}
}
这里用了 default,表示队列满时立刻返回错误,而不是阻塞请求。是否阻塞要看业务需求。对 HTTP 请求来说,队列满了返回 503 可能比一直卡住更好。
启动 worker
func (q *Queue) Start(ctx context.Context, workerCount int, handle func(context.Context, Job) error) {
for i := 0; i < workerCount; i++ {
q.wg.Add(1)
go func(workerID int) {
defer q.wg.Done()
for {
select {
case job := <-q.jobs:
if err := handle(ctx, job); err != nil {
log.Printf("worker=%d job=%d error=%v", workerID, job.ID, err)
}
case <-ctx.Done():
return
}
}
}(i + 1)
}
}
处理函数:
func SendEmail(ctx context.Context, job Job) error {
select {
case <-time.After(200 * time.Millisecond):
log.Printf("send email to %s", job.Email)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
启动:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewQueue(100)
queue.Start(ctx, 4, SendEmail)
停止队列
func (q *Queue) Stop() {
close(q.jobs)
q.wg.Wait()
}
上面的 Start 需要配合关闭 channel 处理:
case job, ok := <-q.jobs:
if !ok {
return
}
if err := handle(ctx, job); err != nil {
log.Printf("worker=%d job=%d error=%v", workerID, job.ID, err)
}
完整 worker 循环应该同时处理 context 取消和 channel 关闭。服务关闭时,你可以先停止接收 HTTP 请求,再关闭队列,等待 worker 结束。
入门队列的边界
内存队列有明显限制:进程崩溃任务会丢,无法跨多实例共享,没有重试持久化,没有延迟任务,没有可视化管理。它适合轻量异步处理和学习,不适合关键业务任务。
如果任务不能丢,比如支付后发货、订单状态同步,应该使用可靠消息队列或数据库任务表,并设计重试、幂等和监控。
增加简单重试
入门版可以在 worker 内部做有限重试:
func handleWithRetry(ctx context.Context, job Job, handle func(context.Context, Job) error) error {
var lastErr error
for attempt := 1; attempt <= 3; attempt++ {
if err := handle(ctx, job); err != nil {
lastErr = err
select {
case <-time.After(time.Duration(attempt) * 200 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
continue
}
return nil
}
return fmt.Errorf("job %d failed after retries: %w", job.ID, lastErr)
}
然后 worker 调用 handleWithRetry。这适合偶发网络抖动,不适合所有错误。比如参数格式错误、用户不存在、权限不足,重试多少次都不会成功。真实系统应该区分可重试错误和不可重试错误。
还要考虑幂等性。如果发送邮件接口不支持幂等,重试可能导致用户收到多封邮件。后台任务不是简单加 goroutine 就结束,任务语义同样重要。
给队列加观测信息
后台任务如果没有日志和指标,出问题很难查。至少记录任务开始、失败和最终成功:
func loggedHandle(ctx context.Context, job Job, handle func(context.Context, Job) error) error {
start := time.Now()
log.Printf("job started id=%d email=%s", job.ID, job.Email)
err := handle(ctx, job)
if err != nil {
log.Printf("job failed id=%d duration=%s err=%v", job.ID, time.Since(start), err)
return err
}
log.Printf("job finished id=%d duration=%s", job.ID, time.Since(start))
return nil
}
还可以记录队列满的次数、处理成功数、失败数。即使不用完整监控系统,简单计数也能回答几个关键问题:任务是不是堆积了?失败是不是突然变多?处理耗时是不是变长?
如果任务重要,日志里要有任务 ID 或业务 ID,方便从用户反馈一路追到后台执行记录。异步系统最怕请求已经返回成功,但后台到底有没有执行没人知道。
小结
Go 可以用 channel、goroutine、WaitGroup 和 context 实现简单 Worker 队列。核心结构是:有缓冲 channel 存任务,固定数量 worker 消费,提交时处理队列满,关闭时停止接收并等待 worker 退出。
这种队列适合入门和轻量任务,但不要把它当成可靠消息系统。理解它的结构和边界后,再学习 Redis 队列、Kafka 或云队列,会更容易判断取舍。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。