Channel:Go 并发的灵魂

深入理解 Go 语言的 Channel 通道,掌握 CSP 并发模型的核心思想

Channel:Go 并发的灵魂

上一篇我们学了 goroutine,但只学会了启动它们。有一个问题还没解决:goroutine 之间怎么交流?

在传统的并发编程中,线程之间通常通过共享内存来通信——比如共享变量加锁。这种方式容易出错,你忘了加锁、忘了释放锁、不小心造成死锁……各种坑。

Go 语言提出了一种不同的哲学:不要通过共享内存来通信,而要通过通信来共享内存。

这句话来自 Go 的设计文档,也是 Go 并发编程的核心思想。而实现这种"通过通信来共享内存"的工具,就是 Channel(通道)

Channel 就像一根管子,一个 goroutine 往里面塞数据,另一个 goroutine 从里面取数据。它不仅是数据传输的通道,更是 goroutine 之间同步的机制。

让我们开始探索这个神奇的工具吧!

什么是 Channel?

如果 goroutine 是 Go 并发世界里的"工人",那 channel 就是工人之间传递零件的"传送带"。

  • 一个工人(goroutine)把东西放到传送带(channel)上
  • 另一个工人(goroutine)从传送带上拿走东西
  • 传送带本身也有一定的容量(缓冲区)

Channel 是类型安全的——一个 chan int 只能传输整数,一个 chan string 只能传输字符串。

创建 Channel

Channel 用 make 函数创建:

// 创建一个无缓冲的 int 通道
ch := make(chan int)

// 创建一个缓冲区大小为 10 的 string 通道
ch2 := make(chan string, 10)

注意两种创建方式的区别:

  • make(chan T):无缓冲通道
  • make(chan T, size):有缓冲通道

这两种通道有着截然不同的行为,后面会详细讲。

发送和接收

Channel 的基本操作有两个:

ch <- value  // 发送:把 value 发送到通道 ch
value := <-ch  // 接收:从通道 ch 接收一个值

箭头 <- 的方向表示了数据的流向:

  • ch <- 箭头指向通道,表示发送到通道
  • <-ch 箭头从通道出来,表示从通道接收

第一个完整的例子

package main

import "fmt"

func main() {
	// 创建一个无缓冲通道
	ch := make(chan string)

	// 启动一个 goroutine 发送数据
	go func() {
		ch <- "Hello from goroutine!"
	}()

	// 主 goroutine 接收数据
	msg := <-ch
	fmt.Println(msg)  // Hello from goroutine!
}

这个程序的工作流程:

  1. 创建了一个无缓冲通道 ch
  2. 启动一个 goroutine 往通道里发送 “Hello from goroutine!”
  3. 主 goroutine 从通道接收数据

无缓冲通道有一个重要的特性:发送和接收必须同时准备好。也就是说,发送方会一直阻塞,直到有接收方来接收;接收方会一直阻塞,直到有发送方来发送。

这就像一个电话——打电话的人必须等到接电话的人拿起话筒,通话才能开始。

无缓冲通道 vs 有缓冲通道

无缓冲通道(Unbuffered Channel)

无缓冲通道没有存储空间,发送和接收必须同步发生

ch := make(chan int)  // 无缓冲

// 发送方会阻塞,直到有人接收
go func() {
	fmt.Println("发送前")
	ch <- 42  // 阻塞在这里,等待接收方
	fmt.Println("发送后")
}()

// 接收方会阻塞,直到有人发送
value := <-ch
fmt.Println("接收到:", value)

输出:

发送前
接收到: 42
发送后

注意"发送后"在"接收到"之后才打印——这说明发送操作确实被阻塞了,直到接收方才继续。

有缓冲通道(Buffered Channel)

有缓冲通道内部有一个队列,可以存储一定数量的值。发送操作只在缓冲区满的时候才阻塞,接收操作只在缓冲区空的时候才阻塞。

ch := make(chan int, 3)  // 缓冲区大小为 3

ch <- 1  // 不阻塞,放入缓冲区
ch <- 2  // 不阻塞,放入缓冲区
ch <- 3  // 不阻塞,放入缓冲区
// ch <- 4  // ❌ 会阻塞!缓冲区已满

fmt.Println(<-ch)  // 1
fmt.Println(<-ch)  // 2
fmt.Println(<-ch)  // 3

有缓冲通道就像一个邮箱——你可以把信扔进邮箱就走(不阻塞),邮递员什么时候来取信是另一回事。

⚠️ 注意:有缓冲通道虽然可以减少阻塞,但如果使用不当,可能隐藏同步问题。大多数情况下,无缓冲通道更安全,因为它的同步语义更明确。

Channel 的方向

你可以限制 channel 只能发送或只能接收,增加类型安全性:

// 只能发送的通道
var sendOnly chan<- int

// 只能接收的通道
var recvOnly <-chan int

// 既可以发送也可以接收
var bidirectional chan int

这在函数参数中特别有用:

func producer(out chan<- int) {
	// 只能发送,不能接收
	for i := 0; i < 5; i++ {
		out <- i
	}
	close(out)
}

func consumer(in <-chan int) {
	// 只能接收,不能发送
	for v := range in {
		fmt.Println(v)
	}
}

func main() {
	ch := make(chan int)
	go producer(ch)
	consumer(ch)
}

这种设计让你的代码意图更清晰——producer 只负责生产数据,consumer 只负责消费数据。

关闭 Channel

close() 函数关闭通道:

ch := make(chan int)
close(ch)

关闭通道表示"不会再有数据发送了"。关闭后的通道有以下行为:

  • 接收:立即返回零值和 false
  • 发送:会 panic!
  • 再次关闭:会 panic!

检查通道是否关闭

ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)

// 方式一:comma ok 模式
value, ok := <-ch
if ok {
	fmt.Println("收到:", value)
} else {
	fmt.Println("通道已关闭")
}

// 方式二:for range 遍历
ch2 := make(chan int, 3)
ch2 <- 1
ch2 <- 2
ch2 <- 3
close(ch2)

for v := range ch2 {
	fmt.Println(v)  // 1, 2, 3
}
// 循环在通道关闭且所有数据都被读取后自动结束

⚠️ 重要原则

  1. 只有发送方才应该关闭通道
  2. 不要关闭一个已经被关闭的通道
  3. 不要往已关闭的通道发送数据
  4. 有缓冲通道关闭后,仍然可以接收缓冲区中剩余的数据

Select 多路复用

select 是 Go 并发编程中最重要的控制结构。它让你可以同时监听多个 channel 的操作,哪个先就绪就处理哪个:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "来自通道 1 的消息"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "来自通道 2 的消息"
	}()

	// 使用 select 同时监听两个通道
	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	}
}

select 的工作方式:

  1. 同时检查所有的 case
  2. 如果有多个 case 就绪,随机选择一个执行
  3. 如果没有任何 case 就绪,就阻塞(除非有 default 分支)

超时控制

select 最常见的用法之一是实现超时:

ch := make(chan string)

go func() {
	time.Sleep(5 * time.Second)
	ch <- "结果"
}()

select {
case result := <-ch:
	fmt.Println("收到结果:", result)
case <-time.After(3 * time.Second):
	fmt.Println("超时了!")
}
// 输出:超时了!

time.After 返回一个通道,在指定时间后发送当前时间。这让你可以轻松实现超时逻辑。

默认分支

如果 select 有 default 分支,当没有 case 就绪时,会立即执行 default 而不会阻塞:

ch := make(chan int)

select {
case v := <-ch:
	fmt.Println("收到:", v)
default:
	fmt.Println("没有数据,做其他事情")
}

空 select

一个没有任何 case 的 select 会永远阻塞:

select {}  // 永远阻塞

这看起来没用,但在某些场景下可以用来让 main goroutine 永远等待(比如服务器)。

实战:并发 Web 爬虫

让我们用 goroutine 和 channel 实现一个简单的并发 Web 爬虫:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// CrawlResult 爬取结果
type CrawlResult struct {
	URL     string
	Status  string
	Elapsed time.Duration
}

// crawl 模拟爬取一个 URL
func crawl(url string) CrawlResult {
	// 模拟网络延迟
	elapsed := time.Duration(rand.Intn(500)+100) * time.Millisecond
	time.Sleep(elapsed)

	return CrawlResult{
		URL:     url,
		Status:  "OK",
		Elapsed: elapsed,
	}
}

func main() {
	urls := []string{
		"https://example.com/1",
		"https://example.com/2",
		"https://example.com/3",
		"https://example.com/4",
		"https://example.com/5",
		"https://example.com/6",
		"https://example.com/7",
		"https://example.com/8",
	}

	start := time.Now()

	// 创建结果通道
	results := make(chan CrawlResult, len(urls))

	// 启动 worker 池
	workerCount := 3
	var wg sync.WaitGroup

	// 创建 URL 通道
	urlChan := make(chan string, len(urls))
	for _, url := range urls {
		urlChan <- url
	}
	close(urlChan)

	// 启动 worker
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for url := range urlChan {
				fmt.Printf("Worker %d: 爬取 %s\n", id, url)
				result := crawl(url)
				results <- result
			}
		}(i + 1)
	}

	// 等待所有 worker 完成后关闭结果通道
	go func() {
		wg.Wait()
		close(results)
	}()

	// 收集结果
	var totalElapsed time.Duration
	for result := range results {
		fmt.Printf("✅ %s: %s (耗时: %v)\n", result.URL, result.Status, result.Elapsed)
		totalElapsed += result.Elapsed
	}

	fmt.Printf("\n总耗时: %v\n", time.Since(start))
	fmt.Printf("串行耗时: %v\n", totalElapsed)
	fmt.Printf("加速比: %.2fx\n", float64(totalElapsed)/float64(time.Since(start)))
}

这个程序使用了一个经典的生产者-消费者模式:

  1. 把所有 URL 放入一个通道
  2. 启动多个 worker goroutine 从通道中取 URL 并爬取
  3. 每个 worker 把结果发送到结果通道
  4. 主 goroutine 从结果通道收集结果

Channel 的常见模式

1. Fan-In(扇入)

把多个输入通道合并成一个输出通道:

func fanIn(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup

	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for v := range c {
				out <- v
			}
		}(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

2. Fan-Out(扇出)

把一个输入通道的数据分发给多个处理者:

func fanOut(in <-chan int, workerCount int) []<-chan int {
	workers := make([]<-chan int, workerCount)

	for i := 0; i < workerCount; i++ {
		workers[i] = func() <-chan int {
			out := make(chan int)
			go func() {
				defer close(out)
				for v := range in {
					out <- v * 2  // 做一些处理
				}
			}()
			return out
		}()
	}

	return workers
}

3. Pipeline(管道)

把多个处理步骤串成一条流水线:

func generate(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for v := range in {
			out <- v * v
		}
	}()
	return out
}

func filter(in <-chan int, threshold int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for v := range in {
			if v > threshold {
				out <- v
			}
		}
	}()
	return out
}

func main() {
	// 生成 → 平方 → 过滤
	pipeline := filter(square(generate(1, 2, 3, 4, 5, 6)), 10)

	for v := range pipeline {
		fmt.Println(v)  // 16, 25, 36
	}
}

Channel 的死锁检测

Go 运行时会自动检测 channel 死锁。如果所有 goroutine 都在等待,程序会 panic:

func main() {
	ch := make(chan int)
	// 没有人发送,也没有其他 goroutine
	<-ch  // fatal error: all goroutines are asleep - deadlock!
}

这个检测在开发阶段非常有用——它能帮你尽早发现并发逻辑中的 bug。

小结

今天我们深入学习了 Go 语言的 Channel:

  1. Channel 是什么:goroutine 之间通信的管道,类型安全
  2. 创建make(chan T) 无缓冲,make(chan T, n) 有缓冲
  3. 发送和接收ch <- v 发送,v := <-ch 接收
  4. 无缓冲 vs 有缓冲:前者要求同步,后者有缓冲队列
  5. 关闭close(ch),只有发送方应该关闭通道
  6. Select:多路复用,可以同时监听多个通道
  7. 常见模式:Fan-In、Fan-Out、Pipeline
  8. 死锁检测:Go 运行时自动检测

Channel 是 Go 并发编程的灵魂。它不只是数据传输的工具,更是一种编程范式——通过消息传递来协调并发,避免了共享内存和锁的复杂性。

记住这句话:“Do not communicate by sharing memory; instead, share memory by communicating.”

练习时间

  1. 生产者-消费者:实现一个完整的生产者-消费者模型,多个生产者、多个消费者
  2. 超时重试:用 select 实现一个带超时和重试机制的函数
  3. 聊天室:用 channel 实现一个简单的聊天室(多个用户同时发言)
  4. 并发安全计数器:用 channel 实现一个不需要锁的并发安全计数器
  5. 工作池:实现一个通用的 worker pool,支持可配置的 worker 数量和任务队列

下一篇预告

下一篇文章,我们将学习 sync 包——Go 提供的同步工具集。虽然 channel 很优雅,但有些场景用传统的同步原语(互斥锁、读写锁、等待组等)更合适。我们会学习:

  • sync.Mutexsync.RWMutex
  • sync.WaitGroup
  • sync.Once
  • sync.Pool
  • 什么时候用锁,什么时候用 channel

我们下篇见!👋


参考资料:

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页