跳转到正文
zeno's blog
返回

Go 并发(五):并发模式与最佳实践

专题: Go 并发

Table of contents

Open Table of contents

TL;DR

Go 的并发编程远不止 goroutine + channel。生产级代码需要掌握 context 传播与取消树、errgroup 有界并发、worker pool 模式、graceful shutdown 全流程、race detector 的 CI 集成,以及至少十种常见反模式的识别与规避。本文是”如何在生产环境写出正确并发 Go 代码”的完整指南。


1. Context 包——并发控制的脊梁

1.1 为什么 Context 是并发控制的核心

Go 没有线程中断机制(不像 Java 的 Thread.interrupt())。goroutine 一旦启动就是”自治”的,外部无法强制停止它。context 包提供了协作式取消的标准协议:父 goroutine 通过 context 通知子 goroutine “该停了”,子 goroutine 自行检查并退出。

核心设计原则:context 是请求级别的生命周期管理工具,不是全局状态容器。

1.2 四种构造函数

// 根 context——所有 context 树的起点
ctx := context.Background() // 用于 main、init、顶层请求入口
ctx := context.TODO()       // 占位符,表示"还没想好传什么 context"

// 派生 context
ctx, cancel := context.WithCancel(parent)           // 手动取消
ctx, cancel := context.WithDeadline(parent, deadline) // 到达 deadline 自动取消
ctx, cancel := context.WithTimeout(parent, 5*time.Second) // timeout 后自动取消
ctx := context.WithValue(parent, key, val)           // 附加 key-value(慎用)

铁律:拿到 cancel 就必须调用。 不调用 = 资源泄漏(timer、goroutine 都不会被回收)。推荐 defer cancel() 紧跟在创建之后。

1.3 取消传播:Context 树

Background
├── WithCancel (A)
│   ├── WithTimeout (B)  ← B 超时或 A 取消,B 都会取消
│   └── WithValue (C)    ← A 取消时 C 也取消(Value 本身不影响取消)
└── WithDeadline (D)
    └── WithCancel (E)   ← D 到期或 E 主动取消,E 都会取消

关键规则:

1.4 内部实现:cancelCtx / timerCtx / valueCtx

类型核心字段取消机制
cancelCtxdone chan struct{}(lazy 创建), children map[canceler]struct{}, mu sync.Mutex关闭 done channel → 所有监听者收到通知;遍历 children 递归取消
timerCtx内嵌 cancelCtx + timer *time.Timer + deadline time.Timetimer 到期时调用内部 cancel;手动 cancel 时先停 timer 再走 cancelCtx 流程
valueCtxkey, val any + 父 context 引用不参与取消。Value() 沿链向上查找,O(n) 复杂度

propagateCancel 函数在创建子 cancelCtx 时,沿父链向上找到最近的 *cancelCtx,把子注册到父的 children map 中。这就是取消能级联传播的原因。

来源:Go 标准库源码 context.go

1.5 Go 1.20+ 新增 API

WithCancelCause / Cause(Go 1.20)

解决的问题:ctx.Err() 只返回 context.Canceledcontext.DeadlineExceeded,无法知道谁取消的、为什么取消

package main

import (
	"context"
	"errors"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithCancelCause(context.Background())

	go func() {
		time.Sleep(100 * time.Millisecond)
		cancel(errors.New("upstream service returned 503"))
	}()

	<-ctx.Done()
	fmt.Println("ctx.Err():", ctx.Err())                // context canceled
	fmt.Println("cause:", context.Cause(ctx))            // upstream service returned 503
}

生产价值: 在微服务链路中,通过 Cause 可以精确追踪取消的根因,而不是在日志里看到一堆 context canceled

WithoutCancel(Go 1.21)

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	parent, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	// 审计日志需要在请求取消后继续执行
	auditCtx := context.WithoutCancel(parent)

	// parent 超时后 auditCtx 不受影响
	time.Sleep(100 * time.Millisecond)
	fmt.Println("parent err:", parent.Err())     // context deadline exceeded
	fmt.Println("audit err:", auditCtx.Err())    // <nil>
	// auditCtx 仍然能读取 parent 的 Value
}

典型场景:

注意: WithoutCancel 返回的 context 的 Done() 是 nil,Deadline() 返回零值。子 context 仍然引用父 context(Value 查找),可能导致父 context 无法被 GC。

AfterFunc(Go 1.21)

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	var wg sync.WaitGroup
	wg.Add(1)

	// context 取消时自动执行清理,无需手动 select ctx.Done()
	stop := context.AfterFunc(ctx, func() {
		defer wg.Done()
		fmt.Println("cleanup: releasing connection pool")
	})
	_ = stop // 调用 stop() 可以取消注册

	// 模拟工作
	time.Sleep(300 * time.Millisecond)
	wg.Wait()
	// Output: cleanup: releasing connection pool
}

替代了这种 boilerplate 模式:

// 旧写法:为每个清理任务启动一个 goroutine
go func() {
    <-ctx.Done()
    cleanup()
}()

// 新写法:更声明式,且可以通过 stop() 取消注册
stop := context.AfterFunc(ctx, cleanup)

1.6 Context Values 的争议

context.WithValue 是 context 包中最具争议的 API。

合理用途(request-scoped metadata):

反模式(不该用 Value 的场景):

为什么有争议:

  1. 类型不安全:key 和 value 都是 any,编译期无法检查
  2. 隐式依赖:函数签名看不出它依赖了哪些 context value
  3. O(n) 查找:沿 context 链线性搜索,深层嵌套时性能差
  4. 命名冲突:不同包用相同类型的 key 会互相覆盖

最佳实践:用自定义未导出类型作 key,避免冲突:

package middleware

// 未导出类型,外部包不可能构造出相同的 key
type contextKey struct{}

var userIDKey = contextKey{}

func WithUserID(ctx context.Context, id string) context.Context {
	return context.WithValue(ctx, userIDKey, id)
}

func UserID(ctx context.Context) (string, bool) {
	id, ok := ctx.Value(userIDKey).(string)
	return id, ok
}

2. errgroup——取代 WaitGroup + 手工错误收集

2.1 为什么 WaitGroup 不够

sync.WaitGroup 只管”等所有 goroutine 结束”,不管:

2.2 errgroup 完整 API

import "golang.org/x/sync/errgroup"

// 创建带 context 的 group(推荐)
g, ctx := errgroup.WithContext(parentCtx)

// 设置并发限制(信号量行为)
g.SetLimit(10) // 最多 10 个 goroutine 同时运行

// 启动 goroutine(阻塞版:达到限制时阻塞等待)
g.Go(func() error {
    return doWork(ctx)
})

// 非阻塞启动:达到限制时立即返回 false
started := g.TryGo(func() error {
    return doWork(ctx)
})

// 等待所有 goroutine 完成,返回第一个非 nil error
err := g.Wait()
方法行为
Go(f)启动 goroutine 执行 f。如果设了 limit,会阻塞到有空位
TryGo(f)非阻塞版 Go。有空位则启动并返回 true,否则返回 false
SetLimit(n)设置最大并发数。n < 0 表示无限制,n = 0 禁止新增。必须在任何 Go 调用之前设置
Wait()阻塞到所有 goroutine 结束,返回第一个 error
WithContext(ctx)创建 group + 派生 context。第一个 error 会取消该 context

2.3 errgroup vs WaitGroup 对比

维度sync.WaitGrouperrgroup.Group
错误处理无,需手动收集自动捕获第一个 error
取消传播WithContext 版本自动取消
并发限制SetLimit 内置支持
panic 处理panic 直接崩掉整个程序同样不处理(需要额外包装)
标准库golang.org/x/sync(准标准库)

2.4 生产示例:有界并发 HTTP 批量请求

package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	urls := []string{
		"https://httpbin.org/delay/1",
		"https://httpbin.org/delay/2",
		"https://httpbin.org/status/200",
		"https://httpbin.org/status/500",
		"https://httpbin.org/delay/1",
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	results := make([]string, len(urls))
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(3) // 最多 3 个并发请求

	var mu sync.Mutex // 保护 results 写入(或使用索引避免竞争)

	for i, url := range urls {
		i, url := i, url // Go 1.22+ 不需要这行,但为了兼容性保留
		g.Go(func() error {
			req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
			if err != nil {
				return fmt.Errorf("creating request for %s: %w", url, err)
			}

			resp, err := http.DefaultClient.Do(req)
			if err != nil {
				return fmt.Errorf("fetching %s: %w", url, err)
			}
			defer resp.Body.Close()

			if resp.StatusCode >= 500 {
				return fmt.Errorf("server error from %s: %d", url, resp.StatusCode)
			}

			body, err := io.ReadAll(resp.Body)
			if err != nil {
				return fmt.Errorf("reading body from %s: %w", url, err)
			}

			mu.Lock()
			results[i] = string(body[:min(len(body), 100)]) // 截断展示
			mu.Unlock()
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("batch fetch failed: %v\n", err)
		// 注意:第一个 error 出现后 ctx 被取消,其余请求会快速退出
	}

	for i, r := range results {
		if r != "" {
			fmt.Printf("[%d] %s\n", i, r)
		}
	}
}

要点:


3. Worker Pool 模式

3.1 什么时候需要 Worker Pool

场景推荐方案
有界并发,任务数量已知errgroup.SetLimit 足够
长期运行的后台处理(消息队列消费者)Worker pool
需要精细控制 worker 生命周期(热更新、动态扩缩)Worker pool
任务处理需要昂贵的初始化(DB 连接、模型加载)Worker pool(每个 worker 持有自己的资源)

3.2 固定大小 Worker Pool

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Task 代表一个工作单元
type Task struct {
	ID      int
	Payload string
}

// Result 代表处理结果
type Result struct {
	TaskID int
	Output string
	Err    error
}

func workerPool(ctx context.Context, numWorkers int, tasks <-chan Task) <-chan Result {
	results := make(chan Result, numWorkers) // 带缓冲,避免 worker 阻塞
	var wg sync.WaitGroup

	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			for {
				select {
				case <-ctx.Done():
					return
				case task, ok := <-tasks:
					if !ok {
						return // channel 关闭,所有任务处理完毕
					}
					result := process(ctx, workerID, task)
					select {
					case results <- result:
					case <-ctx.Done():
						return
					}
				}
			}
		}(i)
	}

	// 关闭 results channel
	go func() {
		wg.Wait()
		close(results)
	}()

	return results
}

func process(_ context.Context, workerID int, task Task) Result {
	// 模拟处理
	time.Sleep(50 * time.Millisecond)
	return Result{
		TaskID: task.ID,
		Output: fmt.Sprintf("worker-%d processed: %s", workerID, task.Payload),
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tasks := make(chan Task, 100)

	// 投递任务
	go func() {
		defer close(tasks)
		for i := 0; i < 20; i++ {
			select {
			case tasks <- Task{ID: i, Payload: fmt.Sprintf("item-%d", i)}:
			case <-ctx.Done():
				return
			}
		}
	}()

	// 启动 5 个 worker
	results := workerPool(ctx, 5, tasks)

	for r := range results {
		if r.Err != nil {
			fmt.Printf("task %d failed: %v\n", r.TaskID, r.Err)
		} else {
			fmt.Printf("task %d: %s\n", r.TaskID, r.Output)
		}
	}
}

3.3 信号量模式(Semaphore)

两种实现:

方式一:buffered channel(简单场景)

sem := make(chan struct{}, maxConcurrency)

for _, task := range tasks {
	sem <- struct{}{} // 获取信号量(满了就阻塞)
	go func(t Task) {
		defer func() { <-sem }() // 释放信号量
		process(t)
	}(task)
}
// 等待所有完成需要额外的 WaitGroup

方式二:golang.org/x/sync/semaphore(需要 context 集成或加权控制)

package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
	"golang.org/x/sync/semaphore"
)

func main() {
	ctx := context.Background()
	sem := semaphore.NewWeighted(3) // 最多 3 个并发

	g, ctx := errgroup.WithContext(ctx)

	for i := 0; i < 10; i++ {
		i := i
		g.Go(func() error {
			// Acquire 支持 context,超时/取消时自动返回 error
			if err := sem.Acquire(ctx, 1); err != nil {
				return err
			}
			defer sem.Release(1)

			fmt.Printf("processing %d\n", i)
			time.Sleep(100 * time.Millisecond)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Println("error:", err)
	}
}

semaphore.Weighted vs buffered channel:

维度buffered channelsemaphore.Weighted
context 集成需要 select原生支持
加权获取不支持(每次只能获取 1)支持(一次获取 N 个单位)
非阻塞尝试用 select + defaultTryAcquire()
依赖标准库golang.org/x/sync

3.4 生产示例:限速 API 调用器

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/errgroup"
	"golang.org/x/time/rate"
)

// RateLimitedCaller 限速 + 并发控制的 API 调用器
type RateLimitedCaller struct {
	limiter       *rate.Limiter
	maxConcurrent int
}

func NewRateLimitedCaller(rps float64, burst int, maxConcurrent int) *RateLimitedCaller {
	return &RateLimitedCaller{
		limiter:       rate.NewLimiter(rate.Limit(rps), burst),
		maxConcurrent: maxConcurrent,
	}
}

func (c *RateLimitedCaller) BatchCall(ctx context.Context, items []string) ([]string, error) {
	results := make([]string, len(items))
	var mu sync.Mutex

	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(c.maxConcurrent)

	for i, item := range items {
		i, item := i, item
		g.Go(func() error {
			// 等待限速器许可
			if err := c.limiter.Wait(ctx); err != nil {
				return fmt.Errorf("rate limit wait for %q: %w", item, err)
			}

			// 模拟 API 调用
			result := fmt.Sprintf("result for %s at %s", item, time.Now().Format("15:04:05.000"))

			mu.Lock()
			results[i] = result
			mu.Unlock()
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}

func main() {
	caller := NewRateLimitedCaller(
		5,   // 每秒 5 个请求
		2,   // burst 允许突发 2 个
		3,   // 最多 3 个并发
	)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	items := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
	results, err := caller.BatchCall(ctx, items)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	for i, r := range results {
		fmt.Printf("[%d] %s\n", i, r)
	}
}

4. 结构化并发(Structured Concurrency)

4.1 问题:Go 缺乏结构化并发

在结构化并发模型中(如 Java 的 Virtual Threads + StructuredTaskScope、Kotlin coroutines),子任务的生命周期被绑定到父任务——父任务不会在子任务还在跑的时候结束。

Go 的 goroutine 是 fire-and-forget 的:

func handler(w http.ResponseWriter, r *http.Request) {
    go sendEmail(user) // 启动后就忘了——谁来等它?谁来处理它的 error?
    w.WriteHeader(http.StatusOK)
}
// handler 返回后,sendEmail 可能还在跑,也可能 panic 了没人知道

这导致两个根本性问题:

  1. Goroutine 泄漏:没有机制强制等待所有子 goroutine 结束
  2. Error 丢失:子 goroutine 的 panic/error 无法传播到父 goroutine

4.2 errgroup 如何近似结构化并发

errgroup.WithContext 提供了最接近结构化并发的标准方案:

g, ctx := errgroup.WithContext(parentCtx)

g.Go(func() error { return task1(ctx) })
g.Go(func() error { return task2(ctx) })
g.Go(func() error { return task3(ctx) })

// 阻塞到所有 goroutine 完成——父等待所有子
// 第一个 error 取消 ctx——快速失败
if err := g.Wait(); err != nil {
    // 所有子 goroutine 都已退出(前提是它们尊重 ctx)
}

局限: errgroup 不处理 panic。子 goroutine panic 会直接崩掉整个进程。

4.3 sourcegraph/conc:更安全的并发原语

sourcegraph/conc(当前版本 v0.3.0,pre-1.0)在 errgroup 基础上补齐了 panic 处理和泛型支持。

import (
	"github.com/sourcegraph/conc/pool"
	"github.com/sourcegraph/conc/iter"
)

// pool.Pool —— 带 panic 恢复的并发池
p := pool.New().WithMaxGoroutines(5).WithContext(ctx)
p.Go(func(ctx context.Context) error {
	return riskyOperation(ctx)
})
// Wait 时如果子 goroutine panic 了,panic 会被包装成 error(带子 goroutine 的堆栈)
err := p.Wait()

// pool.ResultPool —— 收集结果的并发池
rp := pool.NewWithResults[int]().WithMaxGoroutines(3)
rp.Go(func() int { return compute(1) })
rp.Go(func() int { return compute(2) })
results := rp.Wait() // []int{...}

// iter.ForEach —— 并发遍历 slice
iter.ForEach(items, func(item *Item) {
	item.Process()
})

// iter.Map —— 并发 map
doubled := iter.Map(numbers, func(n *int) int {
	return *n * 2
})

conc vs errgroup 对比:

维度errgroupsourcegraph/conc
panic 处理不处理,直接崩恢复 panic 并包装为 error,保留子 goroutine 堆栈
泛型支持无(只有 func() error有(ResultPool[T]
API 风格函数式Builder 模式(链式调用)
稳定性准标准库,稳定pre-1.0,API 可能变动
依赖golang.org/x/sync第三方库

建议: 生产代码优先用 errgroup。如果需要 panic 安全或泛型结果收集,引入 conc 是合理的。

4.4 确保所有 goroutine 完成的模式

func processAll(ctx context.Context, items []Item) error {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(10)

	for _, item := range items {
		item := item
		g.Go(func() error {
			// 子任务必须尊重 ctx
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
			}
			return processItem(ctx, item)
		})
	}

	// 此处保证:所有 goroutine 都已退出
	return g.Wait()
}

核心原则: 永远不要让 goroutine “逃逸”到它的启动者无法等待的范围之外。每个 go 关键字都应该有对应的 Wait 或类似的生命周期管理。


5. Pipeline 模式(进阶)

基础 pipeline、fan-in/fan-out 模式参见 go-channel-内部机制与使用模式.md。本节聚焦生产级 pipeline 的三个难题:取消传播、错误处理、背压。

5.1 带 Context 取消的 Pipeline Stage

package main

import (
	"context"
	"fmt"
	"time"
)

// stage 是一个 pipeline 阶段的通用签名
// 输入 channel → 处理 → 输出 channel
func stage[In, Out any](
	ctx context.Context,
	in <-chan In,
	fn func(context.Context, In) (Out, error),
) (<-chan Out, <-chan error) {
	out := make(chan Out)
	errc := make(chan error, 1)

	go func() {
		defer close(out)
		defer close(errc)
		for {
			select {
			case <-ctx.Done():
				errc <- ctx.Err()
				return
			case v, ok := <-in:
				if !ok {
					return // 上游关闭
				}
				result, err := fn(ctx, v)
				if err != nil {
					errc <- err
					return // 出错即停止该 stage
				}
				select {
				case out <- result:
				case <-ctx.Done():
					errc <- ctx.Err()
					return
				}
			}
		}
	}()

	return out, errc
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	// Stage 1: 生成数据
	source := make(chan int)
	go func() {
		defer close(source)
		for i := 0; i < 100; i++ {
			select {
			case source <- i:
			case <-ctx.Done():
				return
			}
		}
	}()

	// Stage 2: 翻倍
	doubled, errc1 := stage(ctx, source, func(_ context.Context, n int) (int, error) {
		time.Sleep(50 * time.Millisecond) // 模拟处理耗时
		return n * 2, nil
	})

	// Stage 3: 转字符串
	formatted, errc2 := stage(ctx, doubled, func(_ context.Context, n int) (string, error) {
		return fmt.Sprintf("value=%d", n), nil
	})

	// 消费结果
	for s := range formatted {
		fmt.Println(s)
	}

	// 检查各 stage 的 error
	for _, errc := range []<-chan error{errc1, errc2} {
		if err := <-errc; err != nil {
			fmt.Println("pipeline error:", err)
		}
	}
}

5.2 背压(Backpressure)

有界 channel天然实现背压:

// buffer size = 背压阈值
// 当 consumer 慢于 producer 时,producer 会在 channel 发送处阻塞
buffered := make(chan Data, 100)

无背压的后果: unbounded channel(或无限 goroutine)→ 内存无限增长 → OOM。

背压调优原则:

5.3 优雅关闭 vs 立即中止

策略实现适用场景
Drain(排空)关闭 source channel,让数据流完 pipeline数据不能丢(支付、订单)
Abort(中止)cancel() context,所有 stage 立即退出可重试的任务(缓存预热、搜索)
混合cancel context + 给每个 stage 一个 drain timeout大多数生产场景

6. 并发数据访问模式

6.1 决策表

访问模式推荐方案理由
读多写极少(配置、feature flag)atomic.Value (copy-on-write)读零开销,写时整体替换
读多写少(缓存)sync.RWMutexsync.MapRWMutex 更可预测;sync.Map 在 key 不重叠的高并发场景更快
读写均衡sync.Mutex简单可靠,不要过度优化
写多(计数器、统计)sync/atomic无锁,硬件级原子操作
高竞争写(热点 key)Sharding(分片 + 分锁)把一个锁的竞争分散到 N 个锁
不可变数据(创建后不修改)无需同步只读共享天然安全

6.2 Copy-on-Write 模式(atomic.Value)

适用于读远多于写的场景,如动态配置热加载:

package main

import (
	"fmt"
	"sync/atomic"
)

type Config struct {
	LogLevel    string
	MaxRetries  int
	FeatureFlag map[string]bool // 注意:map 本身是引用类型,但这里整体替换所以安全
}

var globalConfig atomic.Value

func init() {
	globalConfig.Store(&Config{
		LogLevel:    "info",
		MaxRetries:  3,
		FeatureFlag: map[string]bool{"new_ui": false},
	})
}

// 读操作:无锁,极快
func GetConfig() *Config {
	return globalConfig.Load().(*Config)
}

// 写操作:构造新对象,原子替换指针
func UpdateConfig(modifier func(*Config) *Config) {
	for {
		old := globalConfig.Load().(*Config)
		updated := modifier(old)
		if globalConfig.CompareAndSwap(old, updated) {
			return
		}
		// CAS 失败说明有并发更新,重试
	}
}

func main() {
	cfg := GetConfig()
	fmt.Println("log level:", cfg.LogLevel) // info

	UpdateConfig(func(old *Config) *Config {
		// 完整拷贝 + 修改,不修改 old
		newFlags := make(map[string]bool, len(old.FeatureFlag))
		for k, v := range old.FeatureFlag {
			newFlags[k] = v
		}
		newFlags["new_ui"] = true
		return &Config{
			LogLevel:    "debug",
			MaxRetries:  old.MaxRetries,
			FeatureFlag: newFlags,
		}
	})

	fmt.Println("new log level:", GetConfig().LogLevel) // debug
}

核心要点: 永远不修改已发布的对象——创建新对象替换旧指针。旧对象的读者不受影响,GC 会在没有引用时回收。

6.3 sync.Map vs RWMutex

sync.Map 的官方文档明确说了两个最佳场景:

  1. key 只写一次但读很多次(只增长的缓存)
  2. 多个 goroutine 读写不重叠的 key 集合(每个 goroutine 管自己的一片 key)

除此之外,map + RWMutex 通常是更好的选择——它的行为更可预测,性能模型更容易推理。

性能参考: [需验证] 在读占比 95% 的场景下,sync.Map 比 RWMutex 快约 2-3 倍;在读写 50/50 的场景下,RWMutex 通常更快。

6.4 Sharding 分片模式

当单个 Mutex/RWMutex 成为瓶颈时(数万 goroutine 竞争同一把锁),按 key hash 分片到多个独立的 map + mutex 上:

package main

import (
	"fmt"
	"hash/fnv"
	"sync"
)

const numShards = 32

type ShardedMap[V any] struct {
	shards [numShards]struct {
		mu sync.RWMutex
		m  map[string]V
	}
}

func NewShardedMap[V any]() *ShardedMap[V] {
	sm := &ShardedMap[V]{}
	for i := range sm.shards {
		sm.shards[i].m = make(map[string]V)
	}
	return sm
}

func (sm *ShardedMap[V]) shard(key string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(key))
	return h.Sum32() % numShards
}

func (sm *ShardedMap[V]) Set(key string, val V) {
	s := &sm.shards[sm.shard(key)]
	s.mu.Lock()
	s.m[key] = val
	s.mu.Unlock()
}

func (sm *ShardedMap[V]) Get(key string) (V, bool) {
	s := &sm.shards[sm.shard(key)]
	s.mu.RLock()
	val, ok := s.m[key]
	s.mu.RUnlock()
	return val, ok
}

func main() {
	m := NewShardedMap[int]()
	m.Set("foo", 42)
	v, ok := m.Get("foo")
	fmt.Println(v, ok) // 42 true
}

分片数量经验值: CPU 核心数 × 2 到 × 4。分片太少减少不了竞争,分片太多浪费内存。


7. Race Condition 检测与预防

7.1 -race 标志

Go 的 race detector 基于 ThreadSanitizer v2(LLVM 项目的一部分),在编译期对每个内存读写插桩,运行期跟踪每个 goroutine 最后一次访问每个内存地址的信息。

# 编译并运行带 race 检测的程序
go run -race main.go

# 测试时启用
go test -race ./...

# 构建带 race 检测的二进制(可用于预发布环境)
go build -race -o myapp-race ./cmd/myapp

性能开销:

7.2 CI 集成

# GitHub Actions 示例
- name: Test with race detector
  run: go test -race -count=1 ./...
  env:
    GOFLAGS: "-race"

关键: -race 应该在 CI 中每次提交都跑,而不是偶尔跑一次。漏报意味着你需要足够的测试覆盖率才能触发 race。

7.3 常见 Race 模式

循环变量捕获(Go 1.22 之前)

// Go 1.21 及之前——经典 bug
for _, v := range values {
	go func() {
		fmt.Println(v) // 所有 goroutine 都打印最后一个 v
	}()
}

// 修复方式 1:参数传递
for _, v := range values {
	go func(v string) {
		fmt.Println(v)
	}(v)
}

// 修复方式 2:局部变量
for _, v := range values {
	v := v
	go func() {
		fmt.Println(v)
	}()
}

Go 1.22+ 已修复: 每次循环迭代创建新的变量。但前提是模块的 go.mod 中声明了 go 1.22 或更高版本。

来源:Go Blog: Fixing For Loops in Go 1.22

Map 并发读写

m := make(map[string]int)

// 这会 panic:"concurrent map read and map write"
go func() { m["key"] = 1 }()
go func() { _ = m["key"] }()

Go runtime 会在检测到 map 并发读写时直接 throw(不是 panic,无法 recover)。解决方案:sync.Mutexsync.RWMutexsync.Map

Slice 并发 append

var s []int
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
	wg.Add(1)
	go func(n int) {
		defer wg.Done()
		s = append(s, n) // DATA RACE: slice header 和底层数组都不安全
	}(i)
}
wg.Wait()
// len(s) 可能远小于 100

7.4 go vet 静态检查

go vet 在编译期能发现部分并发问题:

检查项说明
copylocks检测包含 Mutex 等锁类型的值被拷贝(应传指针)
loopclosureGo 1.22 之前检测循环变量在闭包中被捕获
atomic检测 atomic.AddInt64(&x, 1)x 没有对齐到 64 位边界(32 位平台问题)
go vet ./...

8. Graceful Shutdown 模式

8.1 核心流程

信号到达 → 标记 not ready → 停止接收新请求 → 等待在途请求完成
→ 停止后台 worker → 关闭资源(DB、MQ 连接) → 退出

8.2 完整生产示例:HTTP Server + 后台 Worker

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

func main() {
	// ========== 1. 信号处理 ==========
	// signal.NotifyContext 是 Go 1.16+ 推荐的方式
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	// ========== 2. 共享状态 ==========
	var (
		ready   = true
		readyMu sync.RWMutex
	)

	setReady := func(r bool) {
		readyMu.Lock()
		ready = r
		readyMu.Unlock()
	}

	isReady := func() bool {
		readyMu.RLock()
		defer readyMu.RUnlock()
		return ready
	}

	// ========== 3. HTTP Server ==========
	mux := http.NewServeMux()

	// 健康检查——k8s readinessProbe 用这个
	mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) {
		if isReady() {
			w.WriteHeader(http.StatusOK)
			fmt.Fprintln(w, "ok")
		} else {
			w.WriteHeader(http.StatusServiceUnavailable)
			fmt.Fprintln(w, "shutting down")
		}
	})

	// 业务接口
	mux.HandleFunc("GET /api/data", func(w http.ResponseWriter, r *http.Request) {
		// 模拟耗时操作
		select {
		case <-time.After(2 * time.Second):
			fmt.Fprintln(w, `{"status":"ok"}`)
		case <-r.Context().Done():
			// 客户端断开或 server 关闭
			return
		}
	})

	server := &http.Server{
		Addr:    ":8080",
		Handler: mux,
	}

	// ========== 4. 后台 Worker ==========
	var workerWg sync.WaitGroup

	workerWg.Add(1)
	go func() {
		defer workerWg.Done()
		ticker := time.NewTicker(5 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				log.Println("background worker: shutting down")
				return
			case <-ticker.C:
				log.Println("background worker: tick")
			}
		}
	}()

	// ========== 5. 启动 HTTP Server ==========
	go func() {
		log.Printf("server listening on %s", server.Addr)
		if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
			log.Fatalf("server error: %v", err)
		}
	}()

	// ========== 6. 等待信号 ==========
	<-ctx.Done()
	log.Println("shutdown signal received")

	// ========== 7. 关闭序列(有顺序!) ==========

	// 7a. 标记 not ready,让 LB/k8s 摘流量
	setReady(false)
	// 给 LB 几秒钟反应时间(k8s 的 endpoint 更新有延迟)
	log.Println("waiting for load balancer to drain...")
	time.Sleep(3 * time.Second)

	// 7b. 关闭 HTTP server(拒绝新连接,等待在途请求)
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer shutdownCancel()

	if err := server.Shutdown(shutdownCtx); err != nil {
		log.Printf("HTTP server shutdown error: %v", err)
	} else {
		log.Println("HTTP server shutdown complete")
	}

	// 7c. 等待后台 worker 结束
	// ctx 已经被取消,worker 会收到信号退出
	workerDone := make(chan struct{})
	go func() {
		workerWg.Wait()
		close(workerDone)
	}()

	select {
	case <-workerDone:
		log.Println("all workers stopped")
	case <-time.After(10 * time.Second):
		log.Println("WARNING: workers did not stop in time")
	}

	// 7d. 关闭其他资源(DB、MQ 等)
	// db.Close()
	// mqConn.Close()

	log.Println("shutdown complete")
	os.Exit(0)
}

关闭顺序铁律:

HTTP server(入口)→ 后台 worker → 数据库/消息队列(底层资源)

先关入口,后关依赖。反过来会导致在途请求访问已关闭的 DB 连接。

Kubernetes 相关:


9. 常见反模式

反模式 1:Goroutine 泄漏——启动了没法停

// ❌ 错误:goroutine 永远阻塞在 channel 读上
func leak() {
	ch := make(chan int)
	go func() {
		val := <-ch // 永远等不到发送者
		fmt.Println(val)
	}()
	// ch 没有被发送也没有被关闭,goroutine 永远不会退出
}

// ✅ 修复:通过 context 提供退出通道
func noLeak(ctx context.Context) {
	ch := make(chan int)
	go func() {
		select {
		case val := <-ch:
			fmt.Println(val)
		case <-ctx.Done():
			return
		}
	}()
}

反模式 2:用 context.Value 做依赖注入

// ❌ 错误:把 DB 连接塞进 context
ctx = context.WithValue(ctx, "db", dbConn)

func handler(ctx context.Context) {
	db := ctx.Value("db").(*sql.DB) // 类型不安全,隐式依赖
}

// ✅ 修复:显式参数
func handler(ctx context.Context, db *sql.DB) {
	// 函数签名明确告诉你它需要什么
}

反模式 3:在 init() 中启动 goroutine

// ❌ 错误:init 里启动的 goroutine 没有生命周期管理
func init() {
	go backgroundSync() // 谁来关它?怎么等它结束?测试怎么办?
}

// ✅ 修复:显式启动 + 生命周期管理
type App struct {
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

func (a *App) Start(ctx context.Context) {
	ctx, a.cancel = context.WithCancel(ctx)
	a.wg.Add(1)
	go func() {
		defer a.wg.Done()
		backgroundSync(ctx)
	}()
}

func (a *App) Stop() {
	a.cancel()
	a.wg.Wait()
}

反模式 4:无界 goroutine 创建(无背压)

// ❌ 错误:每个请求一个 goroutine,无上限
for _, req := range requests { // 100万个请求 → 100万个 goroutine → OOM
	go handleRequest(req)
}

// ✅ 修复:errgroup.SetLimit 或 worker pool
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(100) // 最多 100 个并发
for _, req := range requests {
	req := req
	g.Go(func() error {
		return handleRequest(ctx, req)
	})
}
g.Wait()

反模式 5:持锁做 I/O

// ❌ 错误:持有锁的同时做网络调用——所有其他 goroutine 都在等这个网络请求
mu.Lock()
resp, err := http.Get(url) // 可能几百毫秒甚至几秒
data[key] = parseResp(resp)
mu.Unlock()

// ✅ 修复:先做 I/O,再加锁写入
resp, err := http.Get(url) // 无锁
if err != nil { return err }
result := parseResp(resp)

mu.Lock()
data[key] = result // 加锁时间极短
mu.Unlock()

反模式 6:channel of channels(过度复杂化)

// ❌ 过度设计:请求-响应模式用 channel of channels
type Request struct {
	Data     string
	RespChan chan Response
}
reqCh := make(chan Request)
// 用起来很痛苦,错误处理更痛苦

// ✅ 更简单的方案:直接用函数调用 + mutex,或者用 errgroup

Channel of channels 极少是最佳方案。除非你在实现 actor 模型或明确需要请求-响应的消息传递语义,否则直接调用函数更清晰。

反模式 7:用 time.Sleep 做同步

// ❌ 错误:假设"睡一会儿另一个 goroutine 就执行完了"
go populateCache()
time.Sleep(time.Second) // 在你的笔记本上够了,在 CI 的 1 核容器里不够
useCache()

// ✅ 修复:用 channel、WaitGroup 或 sync.Once 做显式同步
var once sync.Once
var cache map[string]string

func getCache() map[string]string {
	once.Do(func() {
		cache = loadFromDB()
	})
	return cache
}

反模式 8:只处理 errgroup 的第一个 error

// errgroup.Wait() 只返回第一个 error
// 如果你需要所有 error,必须自己收集

// ✅ 收集所有 error 的模式
var (
	mu     sync.Mutex
	errs   []error
)

g, ctx := errgroup.WithContext(ctx)
for _, task := range tasks {
	task := task
	g.Go(func() error {
		if err := process(ctx, task); err != nil {
			mu.Lock()
			errs = append(errs, err)
			mu.Unlock()
			return err // 仍然返回 error 以触发 context 取消
		}
		return nil
	})
}
_ = g.Wait() // 忽略第一个 error,看 errs 列表
fmt.Println("all errors:", errors.Join(errs...))

反模式 9:容器中不设 GOMAXPROCS(Go 1.25 之前)

Go 1.25 之前,GOMAXPROCS 默认读取宿主机的 CPU 核心数,而非容器的 cgroup 限制。一个只分配了 2 CPU 的容器在 64 核机器上会启动 64 个 P,导致大量上下文切换和 CPU throttling。

// Go < 1.25 的修复方案
import _ "go.uber.org/automaxprocs" // 自动检测 cgroup 限制

// Go 1.25+ 已原生支持,无需第三方库

来源:Go Blog: Container-aware GOMAXPROCS

反模式 10:select 中忘记处理 ctx.Done()

// ❌ 错误:只监听数据 channel,不监听取消信号
for msg := range ch {
	process(msg) // context 取消了也停不下来
}

// ✅ 修复:每个 select 都要有 ctx.Done() 分支
for {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case msg, ok := <-ch:
		if !ok {
			return nil
		}
		if err := process(ctx, msg); err != nil {
			return err
		}
	}
}

10. 性能模式

10.1 Sharding 分片

见 6.4 节的 ShardedMap 实现。核心思想:把一把锁的竞争分散到 N 把锁上。

适用场景: Benchmark 显示单个 Mutex 是瓶颈,且 key 分布均匀。 不适用: 热点 key(所有请求打同一个 shard),此时应考虑应用层合并请求或本地缓存。

10.2 Lock-free:atomic 操作

// 计数器——最常见的 lock-free 场景
var counter atomic.Int64

// 多个 goroutine 并发递增
counter.Add(1)

// 读取当前值
val := counter.Load()

什么时候用 atomic vs mutex:

// ❌ 两个 atomic 操作不构成原子事务
atomic.StoreInt64(&balance, newBalance)
atomic.StoreInt64(&lastUpdated, time.Now().Unix())
// 其他 goroutine 可能看到新的 balance 但旧的 lastUpdated

// ✅ 需要事务一致性时用 mutex
mu.Lock()
balance = newBalance
lastUpdated = time.Now().Unix()
mu.Unlock()

10.3 Batch Processing 批处理

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Batcher 收集请求,批量处理
type Batcher[T any] struct {
	maxBatch  int
	maxWait   time.Duration
	process   func([]T) error
	items     []T
	mu        sync.Mutex
	timer     *time.Timer
	ctx       context.Context
	cancel    context.CancelFunc
}

func NewBatcher[T any](ctx context.Context, maxBatch int, maxWait time.Duration, process func([]T) error) *Batcher[T] {
	ctx, cancel := context.WithCancel(ctx)
	return &Batcher[T]{
		maxBatch: maxBatch,
		maxWait:  maxWait,
		process:  process,
		ctx:      ctx,
		cancel:   cancel,
	}
}

func (b *Batcher[T]) Add(item T) {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.items = append(b.items, item)

	if len(b.items) >= b.maxBatch {
		b.flushLocked()
		return
	}

	if b.timer == nil {
		b.timer = time.AfterFunc(b.maxWait, func() {
			b.mu.Lock()
			defer b.mu.Unlock()
			b.flushLocked()
		})
	}
}

func (b *Batcher[T]) flushLocked() {
	if len(b.items) == 0 {
		return
	}
	if b.timer != nil {
		b.timer.Stop()
		b.timer = nil
	}
	batch := b.items
	b.items = nil
	// 异步处理以避免阻塞调用者
	go func() {
		if err := b.process(batch); err != nil {
			fmt.Printf("batch process error: %v\n", err)
		}
	}()
}

func (b *Batcher[T]) Close() {
	b.mu.Lock()
	b.flushLocked()
	b.mu.Unlock()
	b.cancel()
}

func main() {
	ctx := context.Background()

	batcher := NewBatcher(ctx, 5, 100*time.Millisecond, func(items []int) error {
		fmt.Printf("processing batch of %d items: %v\n", len(items), items)
		return nil
	})

	for i := 0; i < 12; i++ {
		batcher.Add(i)
		time.Sleep(20 * time.Millisecond)
	}

	time.Sleep(200 * time.Millisecond) // 等待最后一批
	batcher.Close()
}

10.4 sync.Once:一次性初始化

var (
	instance *ExpensiveResource
	once     sync.Once
)

func GetResource() *ExpensiveResource {
	once.Do(func() {
		// 只执行一次,即使有 1000 个 goroutine 同时调用
		instance = loadExpensiveResource()
	})
	return instance
}

sync.Once 的保证:

Go 1.21+ 新增 sync.OnceFuncsync.OnceValuesync.OnceValues

// 更简洁的写法
getConfig := sync.OnceValue(func() *Config {
	return loadConfig()
})

cfg := getConfig() // 第一次调用时执行 loadConfig,后续直接返回缓存值

11. 测试并发代码

11.1 用 -race + -count 抓 flaky test

# 跑 1000 次,race detector 开启
go test -race -count=1000 -timeout=30m ./pkg/cache/...

# 如果偶发失败,说明有 race condition

原理: race condition 的触发取决于调度时序。多次运行增加触发概率。

11.2 GOMAXPROCS 测试

# 单核——暴露依赖并行性的 bug
GOMAXPROCS=1 go test -race ./...

# 多核——暴露真正的并发 race
GOMAXPROCS=8 go test -race ./...

11.3 t.Parallel() 正确使用

func TestConcurrentAccess(t *testing.T) {
	tests := []struct {
		name  string
		input int
		want  int
	}{
		{"case1", 1, 2},
		{"case2", 2, 4},
		{"case3", 3, 6},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			t.Parallel() // 标记为可并行执行

			// Go 1.22+ 不需要 tt := tt
			got := double(tt.input)
			if got != tt.want {
				t.Errorf("double(%d) = %d, want %d", tt.input, got, tt.want)
			}
		})
	}
}

func double(n int) int { return n * 2 }

注意事项:

11.4 确定性测试:注入同步点

// 通过 channel 控制 goroutine 执行顺序,让并发测试变得可预测
func TestProducerConsumer(t *testing.T) {
	produced := make(chan struct{})
	consumed := make(chan struct{})

	var value int

	// producer
	go func() {
		value = 42
		close(produced) // 通知:已生产
		<-consumed      // 等待:已消费
	}()

	// consumer
	<-produced // 等待:已生产
	if value != 42 {
		t.Errorf("expected 42, got %d", value)
	}
	close(consumed) // 通知:已消费
}

12. Production Checklist

□ 每个 go 关键字都有对应的生命周期管理(WaitGroup/errgroup/context)
□ 每个 context.WithCancel/WithTimeout 返回的 cancel 都被 defer 调用
□ 并发 goroutine 有上限(errgroup.SetLimit / worker pool / semaphore)
□ 所有 select 语句包含 ctx.Done() 分支(除非确认不需要取消)
□ CI 中 go test -race ./... 每次提交都跑
□ go vet ./... 作为 CI 必检项
□ HTTP server 实现了 graceful shutdown(signal → Shutdown(ctx))
□ 关闭顺序正确:入口 → worker → 底层资源
□ 容器部署:Go < 1.25 使用 automaxprocs;Go 1.25+ 已内置
□ 不在持锁时做 I/O
□ 不用 time.Sleep 做同步
□ 不用 context.Value 传递函数依赖
□ 不在 init() 中启动 goroutine
□ map 的并发访问都有保护(mutex / sync.Map / sharding)

参考来源


分享这篇文章:

上一篇
Go 运行时(一):GMP 调度模型
下一篇
Go 并发(四):无锁编程层次-从 CAS 到消除共享