Go channel 流水线入门:生产、处理、取消和关闭怎么配合

用批量处理任务讲 Go channel 流水线的基础写法,包括生产者、工作者、结果汇总、context 取消和 channel 关闭规则。

channel 是 Go 很有代表性的特性,但很多初学者会把它写成“能跑就行”的并发代码:谁关闭 channel 不清楚,错误怎么返回不清楚,取消后 goroutine 会不会退出也不清楚。短期看只是偶尔卡住,长期看就是 goroutine 泄漏和线上不稳定。

本文用一个批量处理 URL 的例子,讲一个简单流水线:生产任务,多个 worker 处理,最后汇总结果。重点不是追求最高性能,而是把 channel 关闭、context 取消和错误处理讲清楚。

定义任务和结果

先定义结构:

type Job struct {
	ID  int
	URL string
}

type Result struct {
	JobID int
	Code  int
	Err   error
}

任务包含 URL,结果包含状态码或错误。不要只传字符串。真实项目里,多一点结构字段会让日志和排查更清楚。

生产者负责关闭任务 channel

生产任务:

func produce(ctx context.Context, urls []string) <-chan Job {
	jobs := make(chan Job)
	go func() {
		defer close(jobs)
		for i, u := range urls {
			select {
			case jobs <- Job{ID: i + 1, URL: u}:
			case <-ctx.Done():
				return
			}
		}
	}()
	return jobs
}

谁发送,谁关闭。produce 是任务 channel 的唯一发送方,所以由它关闭 jobs。接收方不要关闭别人发送的 channel。这个规则很简单,却能避免很多 panic。

select 里监听 ctx.Done(),表示如果上层取消,生产者会停止发送并退出。

worker 处理任务

worker 从 jobs 读取,向 results 写入:

func worker(ctx context.Context, client *http.Client, jobs <-chan Job, results chan<- Result) {
	for job := range jobs {
		code, err := fetch(ctx, client, job.URL)
		select {
		case results <- Result{JobID: job.ID, Code: code, Err: err}:
		case <-ctx.Done():
			return
		}
	}
}

注意 channel 方向:jobs <-chan Job 表示只读,results chan<- Result 表示只写。方向不是必须写,但写上后函数意图更清楚,编译器也能帮你挡住误用。

fetch 也要使用 context:

func fetch(ctx context.Context, client *http.Client, rawURL string) (int, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
	if err != nil {
		return 0, err
	}
	resp, err := client.Do(req)
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()
	io.Copy(io.Discard, resp.Body)
	return resp.StatusCode, nil
}

这里把响应体读完并关闭,是为了连接复用。即使不关心 body,也不要直接丢掉。

汇总并关闭结果 channel

启动多个 worker:

func run(ctx context.Context, urls []string, workerCount int) []Result {
	client := &http.Client{Timeout: 5 * time.Second}
	jobs := produce(ctx, urls)
	results := make(chan Result)

	var wg sync.WaitGroup
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker(ctx, client, jobs, results)
		}()
	}

	go func() {
		wg.Wait()
		close(results)
	}()

	var out []Result
	for result := range results {
		out = append(out, result)
	}
	return out
}

结果 channel 由谁关闭?不是 worker 单独关闭,因为有多个 worker,任何一个提前关闭都会让其他 worker 发送时 panic。正确做法是等待所有 worker 结束后,由一个单独 goroutine 关闭 results。

这个模式很常见:多个发送者共享一个输出 channel 时,用 WaitGroup 等所有发送者结束,再关闭输出。

取消后不要卡住

如果上层只想要前 10 个成功结果,可以在拿够后取消 context:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results := run(ctx, urls, 5)
_ = results

在更复杂的版本里,你可能边读结果边决定取消。关键是所有发送和接收都要能响应 ctx.Done(),否则某个 goroutine 可能永远卡在发送上。

一个危险写法是:

results <- result

如果没人接收,这行会一直阻塞。更稳的写法是前面 worker 中的 select。channel 流水线里,取消路径和正常路径一样重要。

错误处理策略

遇到单个 URL 失败,要不要取消全部任务?这取决于业务。如果是批量探测网站,某个失败可以记录结果继续;如果是多步骤导入,其中一个失败就必须停止,那就应该取消 context。

可以把取消权放在汇总层:

for result := range results {
	if result.Err != nil {
		cancel()
		return nil, result.Err
	}
	out = append(out, result)
}

这时 produceworker 都会因为 context 取消而退出。错误策略不要藏在 worker 里,否则后面很难理解为什么任务突然停了。

buffer 要谨慎

给 channel 加 buffer 可以改善吞吐,但不要把 buffer 当成修复死锁的工具:

results := make(chan Result, workerCount)

小 buffer 可以减少 worker 和汇总 goroutine 的等待。但如果程序逻辑依赖“buffer 足够大所以不会卡住”,那只是把问题推迟。正确的流水线应该在无 buffer 或小 buffer 下也能正常退出。

测试是否能退出

并发流水线最好写一个取消测试,确认取消后函数能返回:

func TestRunCanBeCanceled(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	cancel()

	done := make(chan struct{})
	go func() {
		_ = run(ctx, []string{"https://example.com"}, 2)
		close(done)
	}()

	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("pipeline did not stop after cancel")
	}
}

这个测试不追求覆盖所有网络行为,它只验证生命周期。很多 channel bug 不是结果算错,而是程序结束不了。能用测试守住退出路径,并发代码会更可信。

小结

Go channel 流水线的基本规则是:发送方关闭 channel,多个发送方要用 WaitGroup 汇合后再关闭,所有可能阻塞的发送和接收都要考虑 context 取消。worker 不应该擅自关闭共享输出 channel,错误策略最好集中在汇总层。

channel 不是为了写炫酷并发,而是为了表达数据流。初学者先把生命周期写清楚,再考虑性能优化。能正常结束的并发程序,才有资格谈快。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页