channel 是 Go 很有代表性的特性,但很多初学者会把它写成“能跑就行”的并发代码:谁关闭 channel 不清楚,错误怎么返回不清楚,取消后 goroutine 会不会退出也不清楚。短期看只是偶尔卡住,长期看就是 goroutine 泄漏和线上不稳定。
本文用一个批量处理 URL 的例子,讲一个简单流水线:生产任务,多个 worker 处理,最后汇总结果。重点不是追求最高性能,而是把 channel 关闭、context 取消和错误处理讲清楚。
定义任务和结果
先定义结构:
type Job struct {
ID int
URL string
}
type Result struct {
JobID int
Code int
Err error
}
任务包含 URL,结果包含状态码或错误。不要只传字符串。真实项目里,多一点结构字段会让日志和排查更清楚。
生产者负责关闭任务 channel
生产任务:
func produce(ctx context.Context, urls []string) <-chan Job {
jobs := make(chan Job)
go func() {
defer close(jobs)
for i, u := range urls {
select {
case jobs <- Job{ID: i + 1, URL: u}:
case <-ctx.Done():
return
}
}
}()
return jobs
}
谁发送,谁关闭。produce 是任务 channel 的唯一发送方,所以由它关闭 jobs。接收方不要关闭别人发送的 channel。这个规则很简单,却能避免很多 panic。
select 里监听 ctx.Done(),表示如果上层取消,生产者会停止发送并退出。
worker 处理任务
worker 从 jobs 读取,向 results 写入:
func worker(ctx context.Context, client *http.Client, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
code, err := fetch(ctx, client, job.URL)
select {
case results <- Result{JobID: job.ID, Code: code, Err: err}:
case <-ctx.Done():
return
}
}
}
注意 channel 方向:jobs <-chan Job 表示只读,results chan<- Result 表示只写。方向不是必须写,但写上后函数意图更清楚,编译器也能帮你挡住误用。
fetch 也要使用 context:
func fetch(ctx context.Context, client *http.Client, rawURL string) (int, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
if err != nil {
return 0, err
}
resp, err := client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
return resp.StatusCode, nil
}
这里把响应体读完并关闭,是为了连接复用。即使不关心 body,也不要直接丢掉。
汇总并关闭结果 channel
启动多个 worker:
func run(ctx context.Context, urls []string, workerCount int) []Result {
client := &http.Client{Timeout: 5 * time.Second}
jobs := produce(ctx, urls)
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(ctx, client, jobs, results)
}()
}
go func() {
wg.Wait()
close(results)
}()
var out []Result
for result := range results {
out = append(out, result)
}
return out
}
结果 channel 由谁关闭?不是 worker 单独关闭,因为有多个 worker,任何一个提前关闭都会让其他 worker 发送时 panic。正确做法是等待所有 worker 结束后,由一个单独 goroutine 关闭 results。
这个模式很常见:多个发送者共享一个输出 channel 时,用 WaitGroup 等所有发送者结束,再关闭输出。
取消后不要卡住
如果上层只想要前 10 个成功结果,可以在拿够后取消 context:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results := run(ctx, urls, 5)
_ = results
在更复杂的版本里,你可能边读结果边决定取消。关键是所有发送和接收都要能响应 ctx.Done(),否则某个 goroutine 可能永远卡在发送上。
一个危险写法是:
results <- result
如果没人接收,这行会一直阻塞。更稳的写法是前面 worker 中的 select。channel 流水线里,取消路径和正常路径一样重要。
错误处理策略
遇到单个 URL 失败,要不要取消全部任务?这取决于业务。如果是批量探测网站,某个失败可以记录结果继续;如果是多步骤导入,其中一个失败就必须停止,那就应该取消 context。
可以把取消权放在汇总层:
for result := range results {
if result.Err != nil {
cancel()
return nil, result.Err
}
out = append(out, result)
}
这时 produce 和 worker 都会因为 context 取消而退出。错误策略不要藏在 worker 里,否则后面很难理解为什么任务突然停了。
buffer 要谨慎
给 channel 加 buffer 可以改善吞吐,但不要把 buffer 当成修复死锁的工具:
results := make(chan Result, workerCount)
小 buffer 可以减少 worker 和汇总 goroutine 的等待。但如果程序逻辑依赖“buffer 足够大所以不会卡住”,那只是把问题推迟。正确的流水线应该在无 buffer 或小 buffer 下也能正常退出。
测试是否能退出
并发流水线最好写一个取消测试,确认取消后函数能返回:
func TestRunCanBeCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
done := make(chan struct{})
go func() {
_ = run(ctx, []string{"https://example.com"}, 2)
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("pipeline did not stop after cancel")
}
}
这个测试不追求覆盖所有网络行为,它只验证生命周期。很多 channel bug 不是结果算错,而是程序结束不了。能用测试守住退出路径,并发代码会更可信。
小结
Go channel 流水线的基本规则是:发送方关闭 channel,多个发送方要用 WaitGroup 汇合后再关闭,所有可能阻塞的发送和接收都要考虑 context 取消。worker 不应该擅自关闭共享输出 channel,错误策略最好集中在汇总层。
channel 不是为了写炫酷并发,而是为了表达数据流。初学者先把生命周期写清楚,再考虑性能优化。能正常结束的并发程序,才有资格谈快。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。