Go Worker 队列入门:用 channel 和 context 处理后台任务

本文讲解 Go 中使用 channel、goroutine、WaitGroup 和 context 实现一个简单 Worker 队列,适合入门后台任务处理。

后台任务不应该都塞在 HTTP 请求里

很多 Web 服务会遇到一些不适合在请求里同步完成的事情:发送邮件、生成报表、处理图片、同步第三方数据、写审计日志。简单项目里,你可能先在 handler 里直接调用这些逻辑,但请求会变慢,也更容易受外部系统影响。

Go 的 goroutine 和 channel 很适合实现入门级后台 worker。你可以把任务放进 channel,由固定数量 worker 消费。再配合 context,就能在服务关闭时停止接收新任务,并等待已有任务结束。

这篇文章实现一个内存里的 Worker 队列。它不替代真正的消息队列,比如 Redis、RabbitMQ、Kafka;但适合理解并发任务处理的基本结构。

定义任务和队列

type Job struct {
	ID    int64
	Email string
	Body  string
}

type Queue struct {
	jobs chan Job
	wg   sync.WaitGroup
}

func NewQueue(size int) *Queue {
	return &Queue{
		jobs: make(chan Job, size),
	}
}

提交任务:

func (q *Queue) Submit(ctx context.Context, job Job) error {
	select {
	case q.jobs <- job:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	default:
		return fmt.Errorf("queue is full")
	}
}

这里用了 default,表示队列满时立刻返回错误,而不是阻塞请求。是否阻塞要看业务需求。对 HTTP 请求来说,队列满了返回 503 可能比一直卡住更好。

启动 worker

func (q *Queue) Start(ctx context.Context, workerCount int, handle func(context.Context, Job) error) {
	for i := 0; i < workerCount; i++ {
		q.wg.Add(1)
		go func(workerID int) {
			defer q.wg.Done()
			for {
				select {
				case job := <-q.jobs:
					if err := handle(ctx, job); err != nil {
						log.Printf("worker=%d job=%d error=%v", workerID, job.ID, err)
					}
				case <-ctx.Done():
					return
				}
			}
		}(i + 1)
	}
}

处理函数:

func SendEmail(ctx context.Context, job Job) error {
	select {
	case <-time.After(200 * time.Millisecond):
		log.Printf("send email to %s", job.Email)
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

启动:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := NewQueue(100)
queue.Start(ctx, 4, SendEmail)

停止队列

func (q *Queue) Stop() {
	close(q.jobs)
	q.wg.Wait()
}

上面的 Start 需要配合关闭 channel 处理:

case job, ok := <-q.jobs:
	if !ok {
		return
	}
	if err := handle(ctx, job); err != nil {
		log.Printf("worker=%d job=%d error=%v", workerID, job.ID, err)
	}

完整 worker 循环应该同时处理 context 取消和 channel 关闭。服务关闭时,你可以先停止接收 HTTP 请求,再关闭队列,等待 worker 结束。

入门队列的边界

内存队列有明显限制:进程崩溃任务会丢,无法跨多实例共享,没有重试持久化,没有延迟任务,没有可视化管理。它适合轻量异步处理和学习,不适合关键业务任务。

如果任务不能丢,比如支付后发货、订单状态同步,应该使用可靠消息队列或数据库任务表,并设计重试、幂等和监控。

增加简单重试

入门版可以在 worker 内部做有限重试:

func handleWithRetry(ctx context.Context, job Job, handle func(context.Context, Job) error) error {
	var lastErr error
	for attempt := 1; attempt <= 3; attempt++ {
		if err := handle(ctx, job); err != nil {
			lastErr = err
			select {
			case <-time.After(time.Duration(attempt) * 200 * time.Millisecond):
			case <-ctx.Done():
				return ctx.Err()
			}
			continue
		}
		return nil
	}
	return fmt.Errorf("job %d failed after retries: %w", job.ID, lastErr)
}

然后 worker 调用 handleWithRetry。这适合偶发网络抖动,不适合所有错误。比如参数格式错误、用户不存在、权限不足,重试多少次都不会成功。真实系统应该区分可重试错误和不可重试错误。

还要考虑幂等性。如果发送邮件接口不支持幂等,重试可能导致用户收到多封邮件。后台任务不是简单加 goroutine 就结束,任务语义同样重要。

给队列加观测信息

后台任务如果没有日志和指标,出问题很难查。至少记录任务开始、失败和最终成功:

func loggedHandle(ctx context.Context, job Job, handle func(context.Context, Job) error) error {
	start := time.Now()
	log.Printf("job started id=%d email=%s", job.ID, job.Email)

	err := handle(ctx, job)
	if err != nil {
		log.Printf("job failed id=%d duration=%s err=%v", job.ID, time.Since(start), err)
		return err
	}

	log.Printf("job finished id=%d duration=%s", job.ID, time.Since(start))
	return nil
}

还可以记录队列满的次数、处理成功数、失败数。即使不用完整监控系统,简单计数也能回答几个关键问题:任务是不是堆积了?失败是不是突然变多?处理耗时是不是变长?

如果任务重要,日志里要有任务 ID 或业务 ID,方便从用户反馈一路追到后台执行记录。异步系统最怕请求已经返回成功,但后台到底有没有执行没人知道。

小结

Go 可以用 channel、goroutine、WaitGroup 和 context 实现简单 Worker 队列。核心结构是:有缓冲 channel 存任务,固定数量 worker 消费,提交时处理队列满,关闭时停止接收并等待 worker 退出。

这种队列适合入门和轻量任务,但不要把它当成可靠消息系统。理解它的结构和边界后,再学习 Redis 队列、Kafka 或云队列,会更容易判断取舍。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页