跳转到正文
zeno's blog
返回

Go 并发(一):Channel 内部机制与使用模式

专题: Go 并发

Table of contents

Open Table of contents

TL;DR

Channel 是 Go CSP 并发模型的核心原语,底层由 hchan 结构体实现(环形缓冲区 + 互斥锁 + 两条等待队列)。发送/接收的关键优化是直接传输——当对端已在等待时,数据绕过缓冲区直接拷贝到对方栈上。select 通过随机轮询序(防饥饿)+ 地址排序加锁(防死锁)实现多路复用。理解这些内部机制对写出正确、高效的并发代码至关重要。


1. Channel 解决什么问题

CSP 模型

Go 的并发哲学来自 Tony Hoare 1978 年的 CSP(Communicating Sequential Processes)论文:

Don’t communicate by sharing memory; share memory by communicating.

Channel 是这个哲学的具体实现——它把数据所有权随消息一起转移,从根本上避免了共享状态的竞态问题。

为什么不只用 Mutex

Mutex 和 Channel 解决不同层面的问题:

维度MutexChannel
语义保护共享状态的访问在 goroutine 之间传递数据和信号
思维模型”谁持有锁谁访问""谁收到消息谁处理”
数据所有权共享,需要约定转移,发送后不再持有
组合性多锁容易死锁Pipeline/fan-out/fan-in 天然组合
适用场景缓存、计数器、状态机内部goroutine 协调、pipeline、事件通知
性能轻量(纳秒级)较重(涉及调度,~60ns/op)
错误模式忘记解锁、死锁goroutine 泄漏、向已关闭 channel 发送

经验法则:如果你在保护一个共享变量,用 mutex;如果你在协调 goroutine 的执行流或传递数据,用 channel。

来源:Go wiki - MutexOrChannel


2. hchan 结构体内部

Channel 的运行时表示是 hchan,定义在 runtime/chan.go

结构体定义

type hchan struct {
    qcount   uint           // 缓冲区中当前元素数量
    dataqsiz uint           // 缓冲区容量(make 时指定的 size)
    buf      unsafe.Pointer // 指向环形缓冲区数组
    elemsize uint16         // 单个元素大小
    closed   uint32         // 是否已关闭(0=open, 非0=closed)
    timer    *timer         // 关联的 timer(Go 1.23+ timer channel 用)
    elemtype *_type         // 元素类型信息(GC 用)
    sendx    uint           // 环形缓冲区的发送索引(下一个写入位置)
    recvx    uint           // 环形缓冲区的接收索引(下一个读取位置)
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    lock     mutex          // 保护 hchan 所有字段的互斥锁
}

type waitq struct {
    first *sudog
    last  *sudog
}

内存布局

make(chan T, 5) 的内存布局(非指针类型 T):

┌──────────────────────────────────────────────────────┐
│                  单次 mallocgc 分配                    │
│                                                      │
│  ┌─────────┐  ┌──────┬──────┬──────┬──────┬──────┐  │
│  │  hchan  │  │ T[0] │ T[1] │ T[2] │ T[3] │ T[4] │  │
│  │         │  │      │      │      │      │      │  │
│  │ buf ────┼──►      │      │      │      │      │  │
│  │ sendx=2 │  │      │ ←recvx               ←sendx│  │
│  │ recvx=1 │  │      │      │      │      │      │  │
│  └─────────┘  └──────┴──────┴──────┴──────┴──────┘  │
└──────────────────────────────────────────────────────┘

makechan 的三种分配策略

switch {
case mem == 0:
    // 无缓冲 或 元素大小为 0(如 chan struct{})
    // 只分配 hchan 本身
    c = (*hchan)(mallocgc(hchanSize, nil, true))

case !elem.Pointers():
    // 元素不含指针(如 chan int, chan [16]byte)
    // hchan + buf 一次分配,连续内存,对 GC 友好
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)

default:
    // 元素含指针(如 chan *T, chan []T)
    // hchan 和 buf 分开分配,buf 需要 GC 扫描
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
}

为什么分开? 不含指针的缓冲区对 GC 来说是透明的(不需要扫描内部引用),可以和 hchan 放在一起避免额外的内存分配和间接寻址。含指针的缓冲区必须让 GC 知道里面有指针需要追踪,所以单独分配并传入类型信息。

为什么 hchan 用自己的 mutex 而不是 sync.Mutex

hchan 的 lockruntime.mutex——一个更底层的运行时内部锁,比 sync.Mutex 更轻量。原因:

  1. sync.Mutex 本身依赖运行时的 semaphore,而 channel 是运行时更基础的原语,不能产生循环依赖
  2. runtime.mutex 直接操作 OS 级别的 futex(Linux)或 semaphore(其他平台),没有 goroutine 级调度开销
  3. Channel 操作中持锁时间极短(只做索引移动和指针操作),不需要 sync.Mutex 的公平性和饥饿防护机制

3. 发送与接收算法——完整流程

发送流程(chansend)

ch <- value

    ┌─────────────────────┐
    │   ch == nil ?        │──── YES ──→ 永远阻塞(gopark)
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │     lock(&c.lock)    │
    └─────────┬───────────┘

    ┌─────────▼───────────┐
    │   c.closed != 0 ?    │──── YES ──→ panic("send on closed channel")
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │ recvq 有等待者?      │──── YES ──→ 【直接发送】绕过 buf,
    │ sg = c.recvq.dequeue │            拷贝到接收者栈,goready(sg.g)
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │ buf 有空位?          │──── YES ──→ 拷贝到 buf[sendx],sendx++,qcount++
    │ qcount < dataqsiz   │
    └─────────┬───────────┘
              │ NO(buf 满)
    ┌─────────▼───────────┐
    │ 非阻塞模式?(select) │──── YES ──→ return false
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │ 创建 sudog,加入 sendq │
    │ gopark(挂起当前 G)   │
    │ ...等待被唤醒...       │
    └─────────────────────┘

直接发送优化(关键路径)

当接收者已经在 recvq 中等待时,数据直接从发送者的栈拷贝到接收者的栈,完全绕过环形缓冲区。这是无缓冲 channel 的核心路径,也适用于缓冲 channel 缓冲区为空但有等待接收者的情况。

// send() 的关键逻辑
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem.get() != nil {
        sendDirect(c.elemtype, sg, ep) // 直接拷贝到接收者栈
        sg.elem.set(nil)
    }
    gp := sg.g
    unlockf()
    goready(gp, skip+1) // 唤醒接收者 goroutine
}

sendDirect 使用 memmove 将数据从发送者的内存位置直接拷贝到接收者 sudog 中记录的目标地址(即接收者栈上的变量)。这避免了先写入 buf 再读出的两次拷贝。

接收流程(chanrecv)

与发送对称,但有一个重要的不对称点——从已关闭且有缓冲数据的 channel 接收

<-ch / v, ok := <-ch

    ┌─────────────────────┐
    │   ch == nil ?        │──── YES ──→ 永远阻塞
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │     lock(&c.lock)    │
    └─────────┬───────────┘

    ┌─────────▼───────────────────────┐
    │ c.closed != 0 && c.qcount == 0 ? │─── YES ──→ 返回零值, ok=false
    └─────────┬───────────────────────┘
              │ NO
    ┌─────────▼───────────┐
    │ sendq 有等待者?      │──── YES ──→ 【直接接收】
    │                     │            unbuffered: 直接从发送者栈拷贝
    │                     │            buffered(满): 取 buf[recvx],
    │                     │              然后把发送者数据放入 buf[recvx]
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │ buf 有数据?          │──── YES ──→ 取 buf[recvx],recvx++,qcount--
    │ qcount > 0          │
    └─────────┬───────────┘
              │ NO
    ┌─────────▼───────────┐
    │ 创建 sudog,加入 recvq │
    │ gopark(挂起当前 G)   │
    └─────────────────────┘

buffered channel 满时的 recv 特殊处理:当缓冲满且有发送者等待时,recv() 做两件事:(1) 从 buf[recvx] 取出数据给接收者;(2) 把等待发送者的数据放入 buf[recvx](实际是同一个槽位,因为满时 sendx == recvx)。这保持了 FIFO 语义。

gopark 与 goready

注意:gopark/goready 是调度器层面的操作,详细的 G 状态机和调度流程见 go-gmp-调度模型.md,此处不再展开。

sudog 结构体

sudog(“suspended goroutine”)是 goroutine 与同步对象之间的多对多连接节点——一个 goroutine 可以同时等待多个 channel(select),一个 channel 上也可以排队多个 goroutine。

type sudog struct {
    g    *g                  // 所属 goroutine
    next *sudog              // 链表:channel waitq 中的下一个
    prev *sudog              // 链表:channel waitq 中的上一个
    elem maybeTraceablePtr   // 数据元素指针(可能指向栈)

    acquiretime int64        // 性能分析用
    releasetime int64

    isSelect bool            // 是否参与 select(需要 CAS 竞争唤醒)
    success  bool            // 通信是否成功(false = channel 被关闭)

    waitlink *sudog          // g.waiting 链表(select 时串联所有等待的 sudog)
    c        maybeTraceableChan // 关联的 channel
}

sudog 从一个全局 pool 中分配(acquireSudog/releaseSudog),避免频繁的堆分配。


4. Select 实现(selectgo)

select 语句编译后调用 runtime.selectgo(),实现在 runtime/select.go

编译器优化

编译器在生成代码前会做简化:

select 形式编译结果
select {}直接编译为 block()(永远阻塞,常用于 main goroutine 保活)
select { case <-ch: ... }直接编译为 chanrecv(单 case 无 default)
select { case ch <- v: ... }直接编译为 chansend
单 case + default编译为非阻塞的 chanrecv/chansend(block=false)
多 case调用 selectgo()

selectgo 的四个阶段

selectgo(cases []scase, pollorder []uint16, lockorder []uint16)

阶段 0: 初始化
  ├── pollorder: Fisher-Yates 随机洗牌(决定检查顺序)
  └── lockorder: 按 channel 地址排序(决定加锁顺序)

阶段 1: 加锁所有 channel → 检查是否有就绪 case
  ├── 按 pollorder 遍历所有 case
  │   ├── RECV case: sendq 非空?buf 有数据?channel 已关闭?
  │   └── SEND case: channel 已关闭(panic)?recvq 非空?buf 有空位?
  ├── 找到就绪 case → 执行操作 → 解锁 → 返回
  └── 无就绪 case + 有 default(block=false)→ 解锁 → 返回 -1

阶段 2: 无就绪 case,将当前 G 挂到所有 channel 的等待队列
  ├── 为每个 case 创建一个 sudog(isSelect=true)
  ├── 按 lockorder 遍历,将 sudog 加入对应 channel 的 sendq/recvq
  └── gopark(挂起当前 goroutine)

阶段 3: 被唤醒后,从所有未命中的 channel 等待队列中移除 sudog
  ├── 遍历所有 case,找到触发唤醒的那个
  ├── 从其他 channel 的 waitq 中 dequeue 对应的 sudog
  ├── 释放所有 sudog
  └── 返回命中的 case 索引

为什么 pollorder 要随机?

公平性。如果按固定顺序检查,排在前面的 case 会被优先选中,导致后面的 case 饥饿。随机洗牌保证每个就绪的 case 被选中的概率相等。

为什么 lockorder 按地址排序?

防止死锁。如果两个 goroutine 的 select 涉及相同的 channel 但加锁顺序不同,会形成循环等待。按地址统一排序保证全局一致的加锁顺序,消除死锁可能。使用 heap sort 保证 O(n log n) 时间和常量栈空间。


5. Channel 操作边界情况

完整行为矩阵

操作nil channel已关闭 channel正常 channel
ch <- v(发送)永远阻塞panic: send on closed channel正常发送或阻塞
<-ch(接收)永远阻塞返回零值, ok=false(buf 有数据时先读完)正常接收或阻塞
close(ch)panic: close of nil channelpanic: close of closed channel正常关闭
len(ch)0buf 中剩余元素数buf 中当前元素数
cap(ch)0buf 容量buf 容量

nil channel 的妙用

向 nil channel 发送/接收会永远阻塞。在 select 中,这等价于禁用某个 case

func merge(ch1, ch2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for ch1 != nil || ch2 != nil {
            select {
            case v, ok := <-ch1:
                if !ok {
                    ch1 = nil // 已关闭,禁用此 case
                    continue
                }
                out <- v
            case v, ok := <-ch2:
                if !ok {
                    ch2 = nil
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

close 的行为细节

closechan 做三件事:

  1. 设置 c.closed = 1
  2. 唤醒 recvq 中所有等待者(它们收到零值,success=false
  3. 唤醒 sendq 中所有等待者(它们会 panic)

唤醒动作在释放锁之后批量执行(先收集到 gList,再逐个 goready),避免持锁时触发调度。


6. 有缓冲 vs 无缓冲——设计决策

核心区别

特性无缓冲 make(chan T)有缓冲 make(chan T, N)
语义同步握手(rendezvous)异步队列(bounded queue)
发送阻塞直到有接收者直到 buf 满
数据传输直接栈到栈拷贝经过环形缓冲区
时序保证发送完成 ⟹ 接收者已拿到数据发送完成 ⟹ 数据在 buf 中

make(chan T, 1) 不等于 make(chan T)

这是常见误解。关键区别:

// 无缓冲:发送者阻塞直到接收者就绪
ch := make(chan int)
go func() { ch <- 42 }() // 阻塞,直到下面的接收执行
v := <-ch                  // 此时发送者解除阻塞

// 缓冲 1:发送者可以立即完成(如果 buf 为空)
ch := make(chan int, 1)
ch <- 42  // 不阻塞!数据进入 buf
// ... 可能过很久 ...
v := <-ch // 从 buf 取出

信号通知场景中尤其重要:无缓冲 channel 保证”通知方知道对方已收到”,缓冲 1 只保证”通知已发出”。

缓冲大小的设计决策

容量语义典型场景
0同步点严格的生产者-消费者握手
1信号/最新值done channel、信号量、latest-value
N有界队列削峰填谷、批处理、rate limiting

选择 N 的经验:N 应该来自对上下游吞吐量差异的分析,而不是拍脑袋。如果你不确定 N 该是多少,先用 0(无缓冲),在性能分析确认瓶颈后再加缓冲。


7. 常见 Channel 模式

7.1 Done Channel / 信号通知

// 用 chan struct{} 作为信号,零内存开销
func worker(done chan struct{}) {
    defer close(done) // 完成时通知
    // ... 执行工作 ...
}

func main() {
    done := make(chan struct{})
    go worker(done)
    <-done // 阻塞直到 worker 完成
}

7.2 Fan-Out:多个 worker 读同一个 channel

func fanOut(in <-chan int, workerCount int) {
    var wg sync.WaitGroup
    for range workerCount {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range in {
                process(v) // 多个 goroutine 竞争消费
            }
        }()
    }
    wg.Wait()
}

7.3 Fan-In:多个 channel 合并为一个

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, ch := range channels {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range ch {
                out <- v
            }
        }()
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

7.4 Pipeline:阶段式处理链

func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // generator → square → 消费
    for v := range square(generator(1, 2, 3, 4)) {
        fmt.Println(v) // 1, 4, 9, 16
    }
}

7.5 Timeout(select + time.After)

func doWithTimeout(ch <-chan result, timeout time.Duration) (result, error) {
    select {
    case r := <-ch:
        return r, nil
    case <-time.After(timeout):
        return result{}, fmt.Errorf("timeout after %v", timeout)
    }
}

注意time.After 在循环中使用的泄漏问题已在 Go 1.23+ 修复(见 Pitfalls 第 7 条)。

7.6 Or-Done:包装 channel 以支持取消

func orDone(ctx context.Context, ch <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-ch:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

7.7 Tee:一分为二

func tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
    out1, out2 := make(chan int), make(chan int)
    go func() {
        defer close(out1)
        defer close(out2)
        for v := range orDone(ctx, in) {
            // 使用局部变量和 select 确保两个 out 都收到
            o1, o2 := out1, out2
            for range 2 {
                select {
                case o1 <- v:
                    o1 = nil // 已发送,禁用此 case
                case o2 <- v:
                    o2 = nil
                }
            }
        }
    }()
    return out1, out2
}

7.8 Rate Limiting(time.Ticker)

func rateLimited(ctx context.Context, in <-chan request, rate time.Duration) <-chan request {
    out := make(chan request)
    go func() {
        defer close(out)
        ticker := time.NewTicker(rate)
        defer ticker.Stop()
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                select {
                case r, ok := <-in:
                    if !ok {
                        return
                    }
                    out <- r
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

8. 性能特征

Channel vs Mutex 性能对比

操作典型延迟说明
atomic.AddInt64~0.3 ns/op无锁,硬件级原子操作
sync.Mutex Lock/Unlock~0.8-20 ns/op无竞争时极快,竞争时退化
Channel send+recv~50-100 ns/op涉及锁、内存拷贝、调度

[需验证] 具体数字受 CPU 架构、竞争程度、GOMAXPROCS 影响,以上为典型无竞争/低竞争场景。

Channel 操作的成本分解

一次 channel 发送包含:

  1. 获取 hchan.lock(runtime mutex,比 sync.Mutex 轻但仍有成本)
  2. 数据拷贝typedmemmove,值语义,大结构体很贵)
  3. 可能的 goroutine 调度(goready 唤醒接收者)

大结构体的性能陷阱

Channel 是值拷贝语义——每次发送都会完整拷贝元素。对于大结构体:

// 差:每次发送拷贝整个 1KB 结构体
ch := make(chan BigStruct)  // sizeof(BigStruct) = 1024

// 好:只拷贝 8 字节指针
ch := make(chan *BigStruct) // sizeof(*BigStruct) = 8

使用指针 channel 时注意:发送后不应再修改原始数据,否则失去了 channel 传递所有权的语义保证。

高竞争下的行为

当大量 goroutine 竞争同一个 channel 时,hchan.lock 成为瓶颈。缓解策略:


9. Pitfalls

9.1 Goroutine 泄漏:无缓冲 channel 无接收者

func leak() {
    ch := make(chan int) // 无缓冲
    go func() {
        result := expensiveWork()
        ch <- result // 永远阻塞!如果没人接收,这个 goroutine 永远不会释放
    }()
    // ... 提前返回,没有 <-ch
}

修复:使用 make(chan int, 1) 让发送不阻塞,或确保用 context 提供取消路径。

Go 1.26 引入了实验性的 goroutine leak profile(GOEXPERIMENT=goroutineleakprofile),利用 GC 可达性分析检测此类泄漏。预计 Go 1.27 默认启用。

9.2 从接收端关闭 channel

// 危险!多个发送者 + 接收端关闭 → 发送者 panic
close(ch) // 应该由发送端关闭

原则:只有发送者关闭 channel。如果有多个发送者,用 sync.WaitGroup 等待所有发送者完成后再关闭。

9.3 range over channel 没有 close

go func() {
    for i := range 10 {
        ch <- i
    }
    // 忘记 close(ch)!
}()
for v := range ch { // 永远阻塞在第 11 次接收
    fmt.Println(v)
}

9.4 用 len(ch) 做同步判断(TOCTOU 竞态)

// 错误!len 和后续操作之间其他 goroutine 可能已经改变了 channel 状态
if len(ch) > 0 {
    v := <-ch // 可能仍然阻塞
}

正确做法:用 select + default 做非阻塞检查。

9.5 忘记 channel 是值拷贝

type Data struct {
    Values []int
}

d := Data{Values: []int{1, 2, 3}}
ch <- d
d.Values[0] = 999 // 修改了底层数组!接收者看到的也是 999
                    // 因为 slice header 被拷贝了,但底层数组是共享的

Channel 拷贝的是值本身。对于包含引用类型(slice、map、pointer)的结构体,浅拷贝不等于深拷贝。

9.6 select 多个 case 就绪时是非确定性的

select {
case <-ch1: // 如果 ch1 和 ch2 同时就绪,
case <-ch2: // 不保证先选哪个(随机)
}

不是按代码顺序、不是 round-robin、不是按先就绪选——是随机的(pollorder 洗牌)。如果需要优先级,必须嵌套 select:

select {
case <-highPriority:
    // 高优先级处理
default:
    select {
    case <-highPriority:
    case <-lowPriority:
    }
}

9.7 time.After 在循环中的泄漏

// Go 1.23 之前:每次迭代创建新 timer,旧 timer 直到过期才被 GC
for {
    select {
    case <-ch:
        // ...
    case <-time.After(5 * time.Second): // 泄漏!
        // ...
    }
}

Go 1.23+ 修复:未引用的 timer 可以被 GC 回收,不再泄漏。但 Go 1.23 之前的版本应使用 time.NewTimer + Reset

timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
    select {
    case <-ch:
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(5 * time.Second)
    case <-timer.C:
        // timeout
        timer.Reset(5 * time.Second)
    }
}

Go 1.23+ 的 timer channel 变为无缓冲(同步),Reset/Stop 之后保证不会收到旧值。asynctimerchan GODEBUG 设置将在 Go 1.27 移除。

9.8 向已关闭的 channel 发送导致 panic

这是运行时 panic,不可 recover(实际上可以 recover,但不应该作为正常控制流)。

// 常见于多 goroutine 场景:一个 goroutine 关闭了 channel,
// 其他 goroutine 还在往里发
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel

防护模式:用 context.Context 做取消信号,而不是关闭 channel 来通知发送者停止。

9.9 对 channel 的 double close

close(ch)
close(ch) // panic: close of closed channel

如果不确定是否已关闭,使用 sync.Once

var once sync.Once
safeClose := func() { once.Do(func() { close(ch) }) }

10. 生产清单


版本变更速查

版本Channel 相关变更
Go 1.23timer channel 改为无缓冲(同步),未引用的 timer 可被 GC;引入 asynctimerchan GODEBUG
Go 1.24运行时 mutex 实现优化(间接影响 channel 性能)
Go 1.25sync.WaitGroup.Go 方法;testing/synctest 包(虚拟时间测试并发代码);GOMAXPROCS 容器感知
Go 1.26实验性 goroutine leak profile;asynctimerchan 预告将在 Go 1.27 移除

延伸阅读


分享这篇文章:

上一篇
Go 并发(二):sync 包与并发原语
下一篇
asio(六):十大陷阱与生产实践