Go 的并发为什么容易上手也容易误用
Go 最吸引人的特性之一是并发。启动一个 goroutine 只需要在函数调用前加 go,channel 可以在 goroutine 之间传递数据,select 可以等待多个通信事件,context 可以传递取消信号和超时。相比手动管理线程,这套模型轻很多。
但轻不代表没有成本。goroutine 如果没人退出,会泄漏;channel 如果没人接收,发送方会阻塞;多个 goroutine 同时写共享变量,会产生数据竞争;没有超时的外部调用,会拖住整个请求。入门阶段必须同时学习“怎么启动并发”和“怎么收回来”。
这篇文章从最小 goroutine 讲起,再介绍 channel、WaitGroup、select 和 context。示例会围绕并发抓取页面和处理任务展开。重点不是炫技,而是建立安全直觉:每个 goroutine 都应该有退出路径,每个阻塞等待都应该考虑取消和超时。
启动一个 goroutine
最简单的 goroutine:
package main
import (
"fmt"
"time"
)
func say(message string) {
fmt.Println(message)
}
func main() {
go say("hello from goroutine")
time.Sleep(100 * time.Millisecond)
}
go say(...) 会启动一个新的 goroutine 执行函数。主 goroutine 不会等待它完成,所以示例里用了 time.Sleep。这只是演示,不是推荐做法。真实代码里不要靠睡眠等待并发完成。
更好的方式是使用 sync.WaitGroup:
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("work done")
}()
wg.Wait()
Add(1) 表示有一个任务要等,goroutine 结束时调用 Done(),主流程用 Wait() 等全部完成。defer wg.Done() 常放在 goroutine 开头,避免中途 return 时忘记通知。
channel 用来传递结果
goroutine 之间可以通过 channel 通信:
ch := make(chan string)
go func() {
ch <- "hello"
}()
message := <-ch
fmt.Println(message)
未缓冲 channel 的发送和接收会互相等待。发送方执行 ch <- "hello" 时,如果没有接收者,会阻塞;接收方执行 <-ch 时,如果没有发送者,也会阻塞。
可以创建带缓冲的 channel:
ch := make(chan string, 2)
ch <- "a"
ch <- "b"
fmt.Println(<-ch)
fmt.Println(<-ch)
缓冲 channel 在容量未满时发送不会阻塞。它适合削峰或任务队列,但不要把缓冲当成解决并发问题的万能办法。容量设置不合理,只会把阻塞推迟。
关闭 channel:
close(ch)
关闭表示不会再发送新值。接收方可以用 range 读取直到关闭:
for value := range ch {
fmt.Println(value)
}
通常由发送方关闭 channel。接收方不要随便关闭,因为它不知道是否还有其他发送者。
并发抓取多个地址
写一个并发请求示例:
type FetchResult struct {
URL string
Status int
Err error
}
func fetch(url string) FetchResult {
resp, err := http.Get(url)
if err != nil {
return FetchResult{URL: url, Err: err}
}
defer resp.Body.Close()
return FetchResult{URL: url, Status: resp.StatusCode}
}
并发执行:
func fetchAll(urls []string) []FetchResult {
results := make(chan FetchResult)
for _, url := range urls {
url := url
go func() {
results <- fetch(url)
}()
}
collected := make([]FetchResult, 0, len(urls))
for range urls {
collected = append(collected, <-results)
}
return collected
}
注意 url := url。在循环里启动 goroutine 时,要避免闭包捕获循环变量导致所有 goroutine 看到同一个变量。较新 Go 版本对 range 变量做了改进,但写成局部变量仍然能让意图更清楚,也兼容旧代码。
这个版本有一个问题:http.Get 没有超时。如果某个请求一直卡住,整个函数也会一直等。
使用 context 控制超时
context.Context 用来传递取消信号、超时和请求范围数据。HTTP 请求可以绑定 context:
func fetchWithContext(ctx context.Context, url string) FetchResult {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return FetchResult{URL: url, Err: err}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return FetchResult{URL: url, Err: err}
}
defer resp.Body.Close()
return FetchResult{URL: url, Status: resp.StatusCode}
}
调用时设置总超时:
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
result := fetchWithContext(ctx, "https://example.com")
fmt.Println(result)
defer cancel() 很重要。即使超时没有发生,也应该释放 context 相关资源。
把它放进并发版本:
func fetchAllWithContext(ctx context.Context, urls []string) []FetchResult {
results := make(chan FetchResult, len(urls))
for _, url := range urls {
url := url
go func() {
results <- fetchWithContext(ctx, url)
}()
}
collected := make([]FetchResult, 0, len(urls))
for range urls {
collected = append(collected, <-results)
}
return collected
}
这里 channel 使用了缓冲,容量等于 URL 数量。即使调用方开始接收稍晚,goroutine 发送结果也不会因为没有接收者而卡住。
select 等待多个事件
select 可以同时等待多个 channel:
select {
case result := <-results:
fmt.Println(result)
case <-ctx.Done():
fmt.Println("cancelled:", ctx.Err())
}
如果要在收集结果时响应取消:
func collect(ctx context.Context, results <-chan FetchResult, count int) []FetchResult {
collected := make([]FetchResult, 0, count)
for len(collected) < count {
select {
case result := <-results:
collected = append(collected, result)
case <-ctx.Done():
collected = append(collected, FetchResult{Err: ctx.Err()})
return collected
}
}
return collected
}
ctx.Done() 返回一个 channel,当 context 被取消或超时时关闭。ctx.Err() 可以拿到原因,比如 context deadline exceeded。
在服务端代码里,HTTP 请求的 r.Context() 会在客户端断开、请求超时或服务端取消时结束。你应该把它传给数据库、HTTP 客户端和业务函数,而不是在深层代码里随便创建 context.Background()。
worker 池:限制并发数量
并发不是越多越好。如果有 10000 个 URL,一次启动 10000 个 goroutine 可能压垮目标服务或本机资源。可以用 worker 池限制并发:
func fetchWithWorkers(ctx context.Context, urls []string, workerCount int) []FetchResult {
jobs := make(chan string)
results := make(chan FetchResult, len(urls))
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range jobs {
results <- fetchWithContext(ctx, url)
}
}()
}
go func() {
defer close(jobs)
for _, url := range urls {
select {
case jobs <- url:
case <-ctx.Done():
return
}
}
}()
go func() {
wg.Wait()
close(results)
}()
var collected []FetchResult
for result := range results {
collected = append(collected, result)
}
return collected
}
这个例子有三个 channel 相关动作:主 goroutine 把 URL 发进 jobs;固定数量 worker 从 jobs 读取并发送结果;所有 worker 结束后关闭 results,收集方 range 到 channel 关闭。
真实项目里,worker 池要考虑更多细节,比如错误是否立刻取消全部任务,结果是否需要保持输入顺序,单个任务是否有独立超时。但入门阶段先理解这个骨架就够了。
共享变量需要同步
不要这样统计成功数量:
success := 0
for _, url := range urls {
go func(url string) {
result := fetch(url)
if result.Err == nil {
success++
}
}(url)
}
多个 goroutine 同时修改 success,会产生数据竞争。可以用 sync.Mutex:
var mu sync.Mutex
success := 0
mu.Lock()
success++
mu.Unlock()
但更 Go 风格的方式是让 goroutine 发送结果,单个收集者统计:
success := 0
for _, result := range results {
if result.Err == nil {
success++
}
}
“通过通信共享内存,而不是通过共享内存通信”是 Go 并发常被提到的一句话。它不是绝对规则,但提醒你优先考虑数据流,而不是让很多 goroutine 同时抢同一个变量。
小结
Go 并发的入门语法很简单:go 启动 goroutine,channel 传递数据,WaitGroup 等待完成,select 同时监听多个事件,context 控制取消和超时。真正需要练的是退出路径和资源边界。
写并发代码时,先问几个问题:这个 goroutine 什么时候结束?发送到 channel 时有没有接收者?如果外部请求超时,内部任务能否停止?是否有多个 goroutine 同时写同一个变量?并发数量是否需要限制?
只要这些问题能回答清楚,Go 的并发会非常实用。它不是为了把代码写得神秘,而是让你用较低成本处理网络请求、后台任务和流水线处理这类真实工程问题。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。