Table of contents
Open Table of contents
TL;DR
Go 的并发编程远不止 goroutine + channel。生产级代码需要掌握 context 传播与取消树、errgroup 有界并发、worker pool 模式、graceful shutdown 全流程、race detector 的 CI 集成,以及至少十种常见反模式的识别与规避。本文是”如何在生产环境写出正确并发 Go 代码”的完整指南。
1. Context 包——并发控制的脊梁
1.1 为什么 Context 是并发控制的核心
Go 没有线程中断机制(不像 Java 的 Thread.interrupt())。goroutine 一旦启动就是”自治”的,外部无法强制停止它。context 包提供了协作式取消的标准协议:父 goroutine 通过 context 通知子 goroutine “该停了”,子 goroutine 自行检查并退出。
核心设计原则:context 是请求级别的生命周期管理工具,不是全局状态容器。
1.2 四种构造函数
// 根 context——所有 context 树的起点
ctx := context.Background() // 用于 main、init、顶层请求入口
ctx := context.TODO() // 占位符,表示"还没想好传什么 context"
// 派生 context
ctx, cancel := context.WithCancel(parent) // 手动取消
ctx, cancel := context.WithDeadline(parent, deadline) // 到达 deadline 自动取消
ctx, cancel := context.WithTimeout(parent, 5*time.Second) // timeout 后自动取消
ctx := context.WithValue(parent, key, val) // 附加 key-value(慎用)
铁律:拿到 cancel 就必须调用。 不调用 = 资源泄漏(timer、goroutine 都不会被回收)。推荐 defer cancel() 紧跟在创建之后。
1.3 取消传播:Context 树
Background
├── WithCancel (A)
│ ├── WithTimeout (B) ← B 超时或 A 取消,B 都会取消
│ └── WithValue (C) ← A 取消时 C 也取消(Value 本身不影响取消)
└── WithDeadline (D)
└── WithCancel (E) ← D 到期或 E 主动取消,E 都会取消
关键规则:
- 父 context 取消 → 所有子 context 递归取消
- 子 context 取消 → 不影响父 context 和兄弟 context
- WithValue 不参与取消逻辑,但会被父的取消波及
1.4 内部实现:cancelCtx / timerCtx / valueCtx
| 类型 | 核心字段 | 取消机制 |
|---|---|---|
cancelCtx | done chan struct{}(lazy 创建), children map[canceler]struct{}, mu sync.Mutex | 关闭 done channel → 所有监听者收到通知;遍历 children 递归取消 |
timerCtx | 内嵌 cancelCtx + timer *time.Timer + deadline time.Time | timer 到期时调用内部 cancel;手动 cancel 时先停 timer 再走 cancelCtx 流程 |
valueCtx | key, val any + 父 context 引用 | 不参与取消。Value() 沿链向上查找,O(n) 复杂度 |
propagateCancel 函数在创建子 cancelCtx 时,沿父链向上找到最近的 *cancelCtx,把子注册到父的 children map 中。这就是取消能级联传播的原因。
1.5 Go 1.20+ 新增 API
WithCancelCause / Cause(Go 1.20)
解决的问题:ctx.Err() 只返回 context.Canceled 或 context.DeadlineExceeded,无法知道谁取消的、为什么取消。
package main
import (
"context"
"errors"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithCancelCause(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel(errors.New("upstream service returned 503"))
}()
<-ctx.Done()
fmt.Println("ctx.Err():", ctx.Err()) // context canceled
fmt.Println("cause:", context.Cause(ctx)) // upstream service returned 503
}
生产价值: 在微服务链路中,通过 Cause 可以精确追踪取消的根因,而不是在日志里看到一堆 context canceled。
WithoutCancel(Go 1.21)
package main
import (
"context"
"fmt"
"time"
)
func main() {
parent, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// 审计日志需要在请求取消后继续执行
auditCtx := context.WithoutCancel(parent)
// parent 超时后 auditCtx 不受影响
time.Sleep(100 * time.Millisecond)
fmt.Println("parent err:", parent.Err()) // context deadline exceeded
fmt.Println("audit err:", auditCtx.Err()) // <nil>
// auditCtx 仍然能读取 parent 的 Value
}
典型场景:
- HTTP handler 结束后仍需写审计日志
- 请求取消后仍需上报 metrics/tracing span
- 异步任务需要继承 trace ID 但不受请求生命周期限制
注意: WithoutCancel 返回的 context 的 Done() 是 nil,Deadline() 返回零值。子 context 仍然引用父 context(Value 查找),可能导致父 context 无法被 GC。
AfterFunc(Go 1.21)
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
// context 取消时自动执行清理,无需手动 select ctx.Done()
stop := context.AfterFunc(ctx, func() {
defer wg.Done()
fmt.Println("cleanup: releasing connection pool")
})
_ = stop // 调用 stop() 可以取消注册
// 模拟工作
time.Sleep(300 * time.Millisecond)
wg.Wait()
// Output: cleanup: releasing connection pool
}
替代了这种 boilerplate 模式:
// 旧写法:为每个清理任务启动一个 goroutine
go func() {
<-ctx.Done()
cleanup()
}()
// 新写法:更声明式,且可以通过 stop() 取消注册
stop := context.AfterFunc(ctx, cleanup)
1.6 Context Values 的争议
context.WithValue 是 context 包中最具争议的 API。
合理用途(request-scoped metadata):
- Trace ID / Request ID
- 认证信息(已验证的 user identity)
- Incoming request metadata(如 HTTP headers 中提取的信息)
反模式(不该用 Value 的场景):
- 传递函数依赖(database connection、logger)→ 应该用函数参数或 struct field
- 传递业务逻辑参数 → 应该用显式参数
- 替代函数签名设计 → 本质是在绕过类型系统
为什么有争议:
- 类型不安全:key 和 value 都是
any,编译期无法检查 - 隐式依赖:函数签名看不出它依赖了哪些 context value
- O(n) 查找:沿 context 链线性搜索,深层嵌套时性能差
- 命名冲突:不同包用相同类型的 key 会互相覆盖
最佳实践:用自定义未导出类型作 key,避免冲突:
package middleware
// 未导出类型,外部包不可能构造出相同的 key
type contextKey struct{}
var userIDKey = contextKey{}
func WithUserID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, userIDKey, id)
}
func UserID(ctx context.Context) (string, bool) {
id, ok := ctx.Value(userIDKey).(string)
return id, ok
}
2. errgroup——取代 WaitGroup + 手工错误收集
2.1 为什么 WaitGroup 不够
sync.WaitGroup 只管”等所有 goroutine 结束”,不管:
- 错误收集(需要自己加 mutex + error slice)
- 取消(一个 goroutine 失败了,其他的还在傻跑)
- 并发限制(所有 goroutine 一拥而上)
2.2 errgroup 完整 API
import "golang.org/x/sync/errgroup"
// 创建带 context 的 group(推荐)
g, ctx := errgroup.WithContext(parentCtx)
// 设置并发限制(信号量行为)
g.SetLimit(10) // 最多 10 个 goroutine 同时运行
// 启动 goroutine(阻塞版:达到限制时阻塞等待)
g.Go(func() error {
return doWork(ctx)
})
// 非阻塞启动:达到限制时立即返回 false
started := g.TryGo(func() error {
return doWork(ctx)
})
// 等待所有 goroutine 完成,返回第一个非 nil error
err := g.Wait()
| 方法 | 行为 |
|---|---|
Go(f) | 启动 goroutine 执行 f。如果设了 limit,会阻塞到有空位 |
TryGo(f) | 非阻塞版 Go。有空位则启动并返回 true,否则返回 false |
SetLimit(n) | 设置最大并发数。n < 0 表示无限制,n = 0 禁止新增。必须在任何 Go 调用之前设置 |
Wait() | 阻塞到所有 goroutine 结束,返回第一个 error |
WithContext(ctx) | 创建 group + 派生 context。第一个 error 会取消该 context |
2.3 errgroup vs WaitGroup 对比
| 维度 | sync.WaitGroup | errgroup.Group |
|---|---|---|
| 错误处理 | 无,需手动收集 | 自动捕获第一个 error |
| 取消传播 | 无 | WithContext 版本自动取消 |
| 并发限制 | 无 | SetLimit 内置支持 |
| panic 处理 | panic 直接崩掉整个程序 | 同样不处理(需要额外包装) |
| 标准库 | 是 | golang.org/x/sync(准标准库) |
2.4 生产示例:有界并发 HTTP 批量请求
package main
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
urls := []string{
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/500",
"https://httpbin.org/delay/1",
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
results := make([]string, len(urls))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(3) // 最多 3 个并发请求
var mu sync.Mutex // 保护 results 写入(或使用索引避免竞争)
for i, url := range urls {
i, url := i, url // Go 1.22+ 不需要这行,但为了兼容性保留
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("creating request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return fmt.Errorf("server error from %s: %d", url, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("reading body from %s: %w", url, err)
}
mu.Lock()
results[i] = string(body[:min(len(body), 100)]) // 截断展示
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("batch fetch failed: %v\n", err)
// 注意:第一个 error 出现后 ctx 被取消,其余请求会快速退出
}
for i, r := range results {
if r != "" {
fmt.Printf("[%d] %s\n", i, r)
}
}
}
要点:
SetLimit(3)实现有界并发,无需手写 worker poolerrgroup.WithContext让第一个错误触发取消,其余请求通过ctx感知到取消后快速退出- 注意:
Wait()只返回第一个 error。如果需要收集所有 error,得自己加 mutex + error slice
3. Worker Pool 模式
3.1 什么时候需要 Worker Pool
| 场景 | 推荐方案 |
|---|---|
| 有界并发,任务数量已知 | errgroup.SetLimit 足够 |
| 长期运行的后台处理(消息队列消费者) | Worker pool |
| 需要精细控制 worker 生命周期(热更新、动态扩缩) | Worker pool |
| 任务处理需要昂贵的初始化(DB 连接、模型加载) | Worker pool(每个 worker 持有自己的资源) |
3.2 固定大小 Worker Pool
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Task 代表一个工作单元
type Task struct {
ID int
Payload string
}
// Result 代表处理结果
type Result struct {
TaskID int
Output string
Err error
}
func workerPool(ctx context.Context, numWorkers int, tasks <-chan Task) <-chan Result {
results := make(chan Result, numWorkers) // 带缓冲,避免 worker 阻塞
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-tasks:
if !ok {
return // channel 关闭,所有任务处理完毕
}
result := process(ctx, workerID, task)
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}(i)
}
// 关闭 results channel
go func() {
wg.Wait()
close(results)
}()
return results
}
func process(_ context.Context, workerID int, task Task) Result {
// 模拟处理
time.Sleep(50 * time.Millisecond)
return Result{
TaskID: task.ID,
Output: fmt.Sprintf("worker-%d processed: %s", workerID, task.Payload),
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tasks := make(chan Task, 100)
// 投递任务
go func() {
defer close(tasks)
for i := 0; i < 20; i++ {
select {
case tasks <- Task{ID: i, Payload: fmt.Sprintf("item-%d", i)}:
case <-ctx.Done():
return
}
}
}()
// 启动 5 个 worker
results := workerPool(ctx, 5, tasks)
for r := range results {
if r.Err != nil {
fmt.Printf("task %d failed: %v\n", r.TaskID, r.Err)
} else {
fmt.Printf("task %d: %s\n", r.TaskID, r.Output)
}
}
}
3.3 信号量模式(Semaphore)
两种实现:
方式一:buffered channel(简单场景)
sem := make(chan struct{}, maxConcurrency)
for _, task := range tasks {
sem <- struct{}{} // 获取信号量(满了就阻塞)
go func(t Task) {
defer func() { <-sem }() // 释放信号量
process(t)
}(task)
}
// 等待所有完成需要额外的 WaitGroup
方式二:golang.org/x/sync/semaphore(需要 context 集成或加权控制)
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
func main() {
ctx := context.Background()
sem := semaphore.NewWeighted(3) // 最多 3 个并发
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < 10; i++ {
i := i
g.Go(func() error {
// Acquire 支持 context,超时/取消时自动返回 error
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
fmt.Printf("processing %d\n", i)
time.Sleep(100 * time.Millisecond)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Println("error:", err)
}
}
semaphore.Weighted vs buffered channel:
| 维度 | buffered channel | semaphore.Weighted |
|---|---|---|
| context 集成 | 需要 select | 原生支持 |
| 加权获取 | 不支持(每次只能获取 1) | 支持(一次获取 N 个单位) |
| 非阻塞尝试 | 用 select + default | TryAcquire() |
| 依赖 | 标准库 | golang.org/x/sync |
3.4 生产示例:限速 API 调用器
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
// RateLimitedCaller 限速 + 并发控制的 API 调用器
type RateLimitedCaller struct {
limiter *rate.Limiter
maxConcurrent int
}
func NewRateLimitedCaller(rps float64, burst int, maxConcurrent int) *RateLimitedCaller {
return &RateLimitedCaller{
limiter: rate.NewLimiter(rate.Limit(rps), burst),
maxConcurrent: maxConcurrent,
}
}
func (c *RateLimitedCaller) BatchCall(ctx context.Context, items []string) ([]string, error) {
results := make([]string, len(items))
var mu sync.Mutex
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(c.maxConcurrent)
for i, item := range items {
i, item := i, item
g.Go(func() error {
// 等待限速器许可
if err := c.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit wait for %q: %w", item, err)
}
// 模拟 API 调用
result := fmt.Sprintf("result for %s at %s", item, time.Now().Format("15:04:05.000"))
mu.Lock()
results[i] = result
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
func main() {
caller := NewRateLimitedCaller(
5, // 每秒 5 个请求
2, // burst 允许突发 2 个
3, // 最多 3 个并发
)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
items := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
results, err := caller.BatchCall(ctx, items)
if err != nil {
fmt.Println("error:", err)
return
}
for i, r := range results {
fmt.Printf("[%d] %s\n", i, r)
}
}
4. 结构化并发(Structured Concurrency)
4.1 问题:Go 缺乏结构化并发
在结构化并发模型中(如 Java 的 Virtual Threads + StructuredTaskScope、Kotlin coroutines),子任务的生命周期被绑定到父任务——父任务不会在子任务还在跑的时候结束。
Go 的 goroutine 是 fire-and-forget 的:
func handler(w http.ResponseWriter, r *http.Request) {
go sendEmail(user) // 启动后就忘了——谁来等它?谁来处理它的 error?
w.WriteHeader(http.StatusOK)
}
// handler 返回后,sendEmail 可能还在跑,也可能 panic 了没人知道
这导致两个根本性问题:
- Goroutine 泄漏:没有机制强制等待所有子 goroutine 结束
- Error 丢失:子 goroutine 的 panic/error 无法传播到父 goroutine
4.2 errgroup 如何近似结构化并发
errgroup.WithContext 提供了最接近结构化并发的标准方案:
g, ctx := errgroup.WithContext(parentCtx)
g.Go(func() error { return task1(ctx) })
g.Go(func() error { return task2(ctx) })
g.Go(func() error { return task3(ctx) })
// 阻塞到所有 goroutine 完成——父等待所有子
// 第一个 error 取消 ctx——快速失败
if err := g.Wait(); err != nil {
// 所有子 goroutine 都已退出(前提是它们尊重 ctx)
}
局限: errgroup 不处理 panic。子 goroutine panic 会直接崩掉整个进程。
4.3 sourcegraph/conc:更安全的并发原语
sourcegraph/conc(当前版本 v0.3.0,pre-1.0)在 errgroup 基础上补齐了 panic 处理和泛型支持。
import (
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/conc/iter"
)
// pool.Pool —— 带 panic 恢复的并发池
p := pool.New().WithMaxGoroutines(5).WithContext(ctx)
p.Go(func(ctx context.Context) error {
return riskyOperation(ctx)
})
// Wait 时如果子 goroutine panic 了,panic 会被包装成 error(带子 goroutine 的堆栈)
err := p.Wait()
// pool.ResultPool —— 收集结果的并发池
rp := pool.NewWithResults[int]().WithMaxGoroutines(3)
rp.Go(func() int { return compute(1) })
rp.Go(func() int { return compute(2) })
results := rp.Wait() // []int{...}
// iter.ForEach —— 并发遍历 slice
iter.ForEach(items, func(item *Item) {
item.Process()
})
// iter.Map —— 并发 map
doubled := iter.Map(numbers, func(n *int) int {
return *n * 2
})
conc vs errgroup 对比:
| 维度 | errgroup | sourcegraph/conc |
|---|---|---|
| panic 处理 | 不处理,直接崩 | 恢复 panic 并包装为 error,保留子 goroutine 堆栈 |
| 泛型支持 | 无(只有 func() error) | 有(ResultPool[T]) |
| API 风格 | 函数式 | Builder 模式(链式调用) |
| 稳定性 | 准标准库,稳定 | pre-1.0,API 可能变动 |
| 依赖 | golang.org/x/sync | 第三方库 |
建议: 生产代码优先用 errgroup。如果需要 panic 安全或泛型结果收集,引入 conc 是合理的。
4.4 确保所有 goroutine 完成的模式
func processAll(ctx context.Context, items []Item) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
for _, item := range items {
item := item
g.Go(func() error {
// 子任务必须尊重 ctx
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return processItem(ctx, item)
})
}
// 此处保证:所有 goroutine 都已退出
return g.Wait()
}
核心原则: 永远不要让 goroutine “逃逸”到它的启动者无法等待的范围之外。每个 go 关键字都应该有对应的 Wait 或类似的生命周期管理。
5. Pipeline 模式(进阶)
基础 pipeline、fan-in/fan-out 模式参见
go-channel-内部机制与使用模式.md。本节聚焦生产级 pipeline 的三个难题:取消传播、错误处理、背压。
5.1 带 Context 取消的 Pipeline Stage
package main
import (
"context"
"fmt"
"time"
)
// stage 是一个 pipeline 阶段的通用签名
// 输入 channel → 处理 → 输出 channel
func stage[In, Out any](
ctx context.Context,
in <-chan In,
fn func(context.Context, In) (Out, error),
) (<-chan Out, <-chan error) {
out := make(chan Out)
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
for {
select {
case <-ctx.Done():
errc <- ctx.Err()
return
case v, ok := <-in:
if !ok {
return // 上游关闭
}
result, err := fn(ctx, v)
if err != nil {
errc <- err
return // 出错即停止该 stage
}
select {
case out <- result:
case <-ctx.Done():
errc <- ctx.Err()
return
}
}
}
}()
return out, errc
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Stage 1: 生成数据
source := make(chan int)
go func() {
defer close(source)
for i := 0; i < 100; i++ {
select {
case source <- i:
case <-ctx.Done():
return
}
}
}()
// Stage 2: 翻倍
doubled, errc1 := stage(ctx, source, func(_ context.Context, n int) (int, error) {
time.Sleep(50 * time.Millisecond) // 模拟处理耗时
return n * 2, nil
})
// Stage 3: 转字符串
formatted, errc2 := stage(ctx, doubled, func(_ context.Context, n int) (string, error) {
return fmt.Sprintf("value=%d", n), nil
})
// 消费结果
for s := range formatted {
fmt.Println(s)
}
// 检查各 stage 的 error
for _, errc := range []<-chan error{errc1, errc2} {
if err := <-errc; err != nil {
fmt.Println("pipeline error:", err)
}
}
}
5.2 背压(Backpressure)
用有界 channel天然实现背压:
// buffer size = 背压阈值
// 当 consumer 慢于 producer 时,producer 会在 channel 发送处阻塞
buffered := make(chan Data, 100)
无背压的后果: unbounded channel(或无限 goroutine)→ 内存无限增长 → OOM。
背压调优原则:
- buffer 太小 → producer 频繁阻塞,吞吐量低
- buffer 太大 → 内存占用高,延迟抖动大(数据在 buffer 里排队)
- 经验值:buffer = 预期突发量,或 consumer 处理速率 × 可接受延迟
5.3 优雅关闭 vs 立即中止
| 策略 | 实现 | 适用场景 |
|---|---|---|
| Drain(排空) | 关闭 source channel,让数据流完 pipeline | 数据不能丢(支付、订单) |
| Abort(中止) | cancel() context,所有 stage 立即退出 | 可重试的任务(缓存预热、搜索) |
| 混合 | cancel context + 给每个 stage 一个 drain timeout | 大多数生产场景 |
6. 并发数据访问模式
6.1 决策表
| 访问模式 | 推荐方案 | 理由 |
|---|---|---|
| 读多写极少(配置、feature flag) | atomic.Value (copy-on-write) | 读零开销,写时整体替换 |
| 读多写少(缓存) | sync.RWMutex 或 sync.Map | RWMutex 更可预测;sync.Map 在 key 不重叠的高并发场景更快 |
| 读写均衡 | sync.Mutex | 简单可靠,不要过度优化 |
| 写多(计数器、统计) | sync/atomic | 无锁,硬件级原子操作 |
| 高竞争写(热点 key) | Sharding(分片 + 分锁) | 把一个锁的竞争分散到 N 个锁 |
| 不可变数据(创建后不修改) | 无需同步 | 只读共享天然安全 |
6.2 Copy-on-Write 模式(atomic.Value)
适用于读远多于写的场景,如动态配置热加载:
package main
import (
"fmt"
"sync/atomic"
)
type Config struct {
LogLevel string
MaxRetries int
FeatureFlag map[string]bool // 注意:map 本身是引用类型,但这里整体替换所以安全
}
var globalConfig atomic.Value
func init() {
globalConfig.Store(&Config{
LogLevel: "info",
MaxRetries: 3,
FeatureFlag: map[string]bool{"new_ui": false},
})
}
// 读操作:无锁,极快
func GetConfig() *Config {
return globalConfig.Load().(*Config)
}
// 写操作:构造新对象,原子替换指针
func UpdateConfig(modifier func(*Config) *Config) {
for {
old := globalConfig.Load().(*Config)
updated := modifier(old)
if globalConfig.CompareAndSwap(old, updated) {
return
}
// CAS 失败说明有并发更新,重试
}
}
func main() {
cfg := GetConfig()
fmt.Println("log level:", cfg.LogLevel) // info
UpdateConfig(func(old *Config) *Config {
// 完整拷贝 + 修改,不修改 old
newFlags := make(map[string]bool, len(old.FeatureFlag))
for k, v := range old.FeatureFlag {
newFlags[k] = v
}
newFlags["new_ui"] = true
return &Config{
LogLevel: "debug",
MaxRetries: old.MaxRetries,
FeatureFlag: newFlags,
}
})
fmt.Println("new log level:", GetConfig().LogLevel) // debug
}
核心要点: 永远不修改已发布的对象——创建新对象替换旧指针。旧对象的读者不受影响,GC 会在没有引用时回收。
6.3 sync.Map vs RWMutex
sync.Map 的官方文档明确说了两个最佳场景:
- key 只写一次但读很多次(只增长的缓存)
- 多个 goroutine 读写不重叠的 key 集合(每个 goroutine 管自己的一片 key)
除此之外,map + RWMutex 通常是更好的选择——它的行为更可预测,性能模型更容易推理。
性能参考: [需验证] 在读占比 95% 的场景下,sync.Map 比 RWMutex 快约 2-3 倍;在读写 50/50 的场景下,RWMutex 通常更快。
6.4 Sharding 分片模式
当单个 Mutex/RWMutex 成为瓶颈时(数万 goroutine 竞争同一把锁),按 key hash 分片到多个独立的 map + mutex 上:
package main
import (
"fmt"
"hash/fnv"
"sync"
)
const numShards = 32
type ShardedMap[V any] struct {
shards [numShards]struct {
mu sync.RWMutex
m map[string]V
}
}
func NewShardedMap[V any]() *ShardedMap[V] {
sm := &ShardedMap[V]{}
for i := range sm.shards {
sm.shards[i].m = make(map[string]V)
}
return sm
}
func (sm *ShardedMap[V]) shard(key string) uint32 {
h := fnv.New32a()
h.Write([]byte(key))
return h.Sum32() % numShards
}
func (sm *ShardedMap[V]) Set(key string, val V) {
s := &sm.shards[sm.shard(key)]
s.mu.Lock()
s.m[key] = val
s.mu.Unlock()
}
func (sm *ShardedMap[V]) Get(key string) (V, bool) {
s := &sm.shards[sm.shard(key)]
s.mu.RLock()
val, ok := s.m[key]
s.mu.RUnlock()
return val, ok
}
func main() {
m := NewShardedMap[int]()
m.Set("foo", 42)
v, ok := m.Get("foo")
fmt.Println(v, ok) // 42 true
}
分片数量经验值: CPU 核心数 × 2 到 × 4。分片太少减少不了竞争,分片太多浪费内存。
7. Race Condition 检测与预防
7.1 -race 标志
Go 的 race detector 基于 ThreadSanitizer v2(LLVM 项目的一部分),在编译期对每个内存读写插桩,运行期跟踪每个 goroutine 最后一次访问每个内存地址的信息。
# 编译并运行带 race 检测的程序
go run -race main.go
# 测试时启用
go test -race ./...
# 构建带 race 检测的二进制(可用于预发布环境)
go build -race -o myapp-race ./cmd/myapp
性能开销:
- 执行时间:2-20x(典型 5-10x)
- 内存占用:5-10x
- 没有误报(false positive):检测到的一定是真实的 data race
- 有漏报(false negative):只能检测实际执行路径中发生的 race
7.2 CI 集成
# GitHub Actions 示例
- name: Test with race detector
run: go test -race -count=1 ./...
env:
GOFLAGS: "-race"
关键: -race 应该在 CI 中每次提交都跑,而不是偶尔跑一次。漏报意味着你需要足够的测试覆盖率才能触发 race。
7.3 常见 Race 模式
循环变量捕获(Go 1.22 之前)
// Go 1.21 及之前——经典 bug
for _, v := range values {
go func() {
fmt.Println(v) // 所有 goroutine 都打印最后一个 v
}()
}
// 修复方式 1:参数传递
for _, v := range values {
go func(v string) {
fmt.Println(v)
}(v)
}
// 修复方式 2:局部变量
for _, v := range values {
v := v
go func() {
fmt.Println(v)
}()
}
Go 1.22+ 已修复: 每次循环迭代创建新的变量。但前提是模块的 go.mod 中声明了 go 1.22 或更高版本。
Map 并发读写
m := make(map[string]int)
// 这会 panic:"concurrent map read and map write"
go func() { m["key"] = 1 }()
go func() { _ = m["key"] }()
Go runtime 会在检测到 map 并发读写时直接 throw(不是 panic,无法 recover)。解决方案:sync.Mutex、sync.RWMutex、sync.Map。
Slice 并发 append
var s []int
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
s = append(s, n) // DATA RACE: slice header 和底层数组都不安全
}(i)
}
wg.Wait()
// len(s) 可能远小于 100
7.4 go vet 静态检查
go vet 在编译期能发现部分并发问题:
| 检查项 | 说明 |
|---|---|
copylocks | 检测包含 Mutex 等锁类型的值被拷贝(应传指针) |
loopclosure | Go 1.22 之前检测循环变量在闭包中被捕获 |
atomic | 检测 atomic.AddInt64(&x, 1) 但 x 没有对齐到 64 位边界(32 位平台问题) |
go vet ./...
8. Graceful Shutdown 模式
8.1 核心流程
信号到达 → 标记 not ready → 停止接收新请求 → 等待在途请求完成
→ 停止后台 worker → 关闭资源(DB、MQ 连接) → 退出
8.2 完整生产示例:HTTP Server + 后台 Worker
package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
// ========== 1. 信号处理 ==========
// signal.NotifyContext 是 Go 1.16+ 推荐的方式
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// ========== 2. 共享状态 ==========
var (
ready = true
readyMu sync.RWMutex
)
setReady := func(r bool) {
readyMu.Lock()
ready = r
readyMu.Unlock()
}
isReady := func() bool {
readyMu.RLock()
defer readyMu.RUnlock()
return ready
}
// ========== 3. HTTP Server ==========
mux := http.NewServeMux()
// 健康检查——k8s readinessProbe 用这个
mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) {
if isReady() {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, "ok")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintln(w, "shutting down")
}
})
// 业务接口
mux.HandleFunc("GET /api/data", func(w http.ResponseWriter, r *http.Request) {
// 模拟耗时操作
select {
case <-time.After(2 * time.Second):
fmt.Fprintln(w, `{"status":"ok"}`)
case <-r.Context().Done():
// 客户端断开或 server 关闭
return
}
})
server := &http.Server{
Addr: ":8080",
Handler: mux,
}
// ========== 4. 后台 Worker ==========
var workerWg sync.WaitGroup
workerWg.Add(1)
go func() {
defer workerWg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("background worker: shutting down")
return
case <-ticker.C:
log.Println("background worker: tick")
}
}
}()
// ========== 5. 启动 HTTP Server ==========
go func() {
log.Printf("server listening on %s", server.Addr)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("server error: %v", err)
}
}()
// ========== 6. 等待信号 ==========
<-ctx.Done()
log.Println("shutdown signal received")
// ========== 7. 关闭序列(有顺序!) ==========
// 7a. 标记 not ready,让 LB/k8s 摘流量
setReady(false)
// 给 LB 几秒钟反应时间(k8s 的 endpoint 更新有延迟)
log.Println("waiting for load balancer to drain...")
time.Sleep(3 * time.Second)
// 7b. 关闭 HTTP server(拒绝新连接,等待在途请求)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server shutdown error: %v", err)
} else {
log.Println("HTTP server shutdown complete")
}
// 7c. 等待后台 worker 结束
// ctx 已经被取消,worker 会收到信号退出
workerDone := make(chan struct{})
go func() {
workerWg.Wait()
close(workerDone)
}()
select {
case <-workerDone:
log.Println("all workers stopped")
case <-time.After(10 * time.Second):
log.Println("WARNING: workers did not stop in time")
}
// 7d. 关闭其他资源(DB、MQ 等)
// db.Close()
// mqConn.Close()
log.Println("shutdown complete")
os.Exit(0)
}
关闭顺序铁律:
HTTP server(入口)→ 后台 worker → 数据库/消息队列(底层资源)
先关入口,后关依赖。反过来会导致在途请求访问已关闭的 DB 连接。
Kubernetes 相关:
terminationGracePeriodSeconds默认 30s,你的整个关闭流程(包括 LB drain + server shutdown + worker drain)必须在这个时间内完成- 留 20% 安全余量:如果 k8s 给 30s,你的 shutdown timeout 设 25s
9. 常见反模式
反模式 1:Goroutine 泄漏——启动了没法停
// ❌ 错误:goroutine 永远阻塞在 channel 读上
func leak() {
ch := make(chan int)
go func() {
val := <-ch // 永远等不到发送者
fmt.Println(val)
}()
// ch 没有被发送也没有被关闭,goroutine 永远不会退出
}
// ✅ 修复:通过 context 提供退出通道
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return
}
}()
}
反模式 2:用 context.Value 做依赖注入
// ❌ 错误:把 DB 连接塞进 context
ctx = context.WithValue(ctx, "db", dbConn)
func handler(ctx context.Context) {
db := ctx.Value("db").(*sql.DB) // 类型不安全,隐式依赖
}
// ✅ 修复:显式参数
func handler(ctx context.Context, db *sql.DB) {
// 函数签名明确告诉你它需要什么
}
反模式 3:在 init() 中启动 goroutine
// ❌ 错误:init 里启动的 goroutine 没有生命周期管理
func init() {
go backgroundSync() // 谁来关它?怎么等它结束?测试怎么办?
}
// ✅ 修复:显式启动 + 生命周期管理
type App struct {
cancel context.CancelFunc
wg sync.WaitGroup
}
func (a *App) Start(ctx context.Context) {
ctx, a.cancel = context.WithCancel(ctx)
a.wg.Add(1)
go func() {
defer a.wg.Done()
backgroundSync(ctx)
}()
}
func (a *App) Stop() {
a.cancel()
a.wg.Wait()
}
反模式 4:无界 goroutine 创建(无背压)
// ❌ 错误:每个请求一个 goroutine,无上限
for _, req := range requests { // 100万个请求 → 100万个 goroutine → OOM
go handleRequest(req)
}
// ✅ 修复:errgroup.SetLimit 或 worker pool
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(100) // 最多 100 个并发
for _, req := range requests {
req := req
g.Go(func() error {
return handleRequest(ctx, req)
})
}
g.Wait()
反模式 5:持锁做 I/O
// ❌ 错误:持有锁的同时做网络调用——所有其他 goroutine 都在等这个网络请求
mu.Lock()
resp, err := http.Get(url) // 可能几百毫秒甚至几秒
data[key] = parseResp(resp)
mu.Unlock()
// ✅ 修复:先做 I/O,再加锁写入
resp, err := http.Get(url) // 无锁
if err != nil { return err }
result := parseResp(resp)
mu.Lock()
data[key] = result // 加锁时间极短
mu.Unlock()
反模式 6:channel of channels(过度复杂化)
// ❌ 过度设计:请求-响应模式用 channel of channels
type Request struct {
Data string
RespChan chan Response
}
reqCh := make(chan Request)
// 用起来很痛苦,错误处理更痛苦
// ✅ 更简单的方案:直接用函数调用 + mutex,或者用 errgroup
Channel of channels 极少是最佳方案。除非你在实现 actor 模型或明确需要请求-响应的消息传递语义,否则直接调用函数更清晰。
反模式 7:用 time.Sleep 做同步
// ❌ 错误:假设"睡一会儿另一个 goroutine 就执行完了"
go populateCache()
time.Sleep(time.Second) // 在你的笔记本上够了,在 CI 的 1 核容器里不够
useCache()
// ✅ 修复:用 channel、WaitGroup 或 sync.Once 做显式同步
var once sync.Once
var cache map[string]string
func getCache() map[string]string {
once.Do(func() {
cache = loadFromDB()
})
return cache
}
反模式 8:只处理 errgroup 的第一个 error
// errgroup.Wait() 只返回第一个 error
// 如果你需要所有 error,必须自己收集
// ✅ 收集所有 error 的模式
var (
mu sync.Mutex
errs []error
)
g, ctx := errgroup.WithContext(ctx)
for _, task := range tasks {
task := task
g.Go(func() error {
if err := process(ctx, task); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return err // 仍然返回 error 以触发 context 取消
}
return nil
})
}
_ = g.Wait() // 忽略第一个 error,看 errs 列表
fmt.Println("all errors:", errors.Join(errs...))
反模式 9:容器中不设 GOMAXPROCS(Go 1.25 之前)
Go 1.25 之前,GOMAXPROCS 默认读取宿主机的 CPU 核心数,而非容器的 cgroup 限制。一个只分配了 2 CPU 的容器在 64 核机器上会启动 64 个 P,导致大量上下文切换和 CPU throttling。
// Go < 1.25 的修复方案
import _ "go.uber.org/automaxprocs" // 自动检测 cgroup 限制
// Go 1.25+ 已原生支持,无需第三方库
反模式 10:select 中忘记处理 ctx.Done()
// ❌ 错误:只监听数据 channel,不监听取消信号
for msg := range ch {
process(msg) // context 取消了也停不下来
}
// ✅ 修复:每个 select 都要有 ctx.Done() 分支
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-ch:
if !ok {
return nil
}
if err := process(ctx, msg); err != nil {
return err
}
}
}
10. 性能模式
10.1 Sharding 分片
见 6.4 节的 ShardedMap 实现。核心思想:把一把锁的竞争分散到 N 把锁上。
适用场景: Benchmark 显示单个 Mutex 是瓶颈,且 key 分布均匀。 不适用: 热点 key(所有请求打同一个 shard),此时应考虑应用层合并请求或本地缓存。
10.2 Lock-free:atomic 操作
// 计数器——最常见的 lock-free 场景
var counter atomic.Int64
// 多个 goroutine 并发递增
counter.Add(1)
// 读取当前值
val := counter.Load()
什么时候用 atomic vs mutex:
- atomic 适合: 单个值的简单操作(递增、加载、存储、CAS)
- mutex 适合: 多个值需要原子性更新(“要么都更新,要么都不更新”)
// ❌ 两个 atomic 操作不构成原子事务
atomic.StoreInt64(&balance, newBalance)
atomic.StoreInt64(&lastUpdated, time.Now().Unix())
// 其他 goroutine 可能看到新的 balance 但旧的 lastUpdated
// ✅ 需要事务一致性时用 mutex
mu.Lock()
balance = newBalance
lastUpdated = time.Now().Unix()
mu.Unlock()
10.3 Batch Processing 批处理
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Batcher 收集请求,批量处理
type Batcher[T any] struct {
maxBatch int
maxWait time.Duration
process func([]T) error
items []T
mu sync.Mutex
timer *time.Timer
ctx context.Context
cancel context.CancelFunc
}
func NewBatcher[T any](ctx context.Context, maxBatch int, maxWait time.Duration, process func([]T) error) *Batcher[T] {
ctx, cancel := context.WithCancel(ctx)
return &Batcher[T]{
maxBatch: maxBatch,
maxWait: maxWait,
process: process,
ctx: ctx,
cancel: cancel,
}
}
func (b *Batcher[T]) Add(item T) {
b.mu.Lock()
defer b.mu.Unlock()
b.items = append(b.items, item)
if len(b.items) >= b.maxBatch {
b.flushLocked()
return
}
if b.timer == nil {
b.timer = time.AfterFunc(b.maxWait, func() {
b.mu.Lock()
defer b.mu.Unlock()
b.flushLocked()
})
}
}
func (b *Batcher[T]) flushLocked() {
if len(b.items) == 0 {
return
}
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
batch := b.items
b.items = nil
// 异步处理以避免阻塞调用者
go func() {
if err := b.process(batch); err != nil {
fmt.Printf("batch process error: %v\n", err)
}
}()
}
func (b *Batcher[T]) Close() {
b.mu.Lock()
b.flushLocked()
b.mu.Unlock()
b.cancel()
}
func main() {
ctx := context.Background()
batcher := NewBatcher(ctx, 5, 100*time.Millisecond, func(items []int) error {
fmt.Printf("processing batch of %d items: %v\n", len(items), items)
return nil
})
for i := 0; i < 12; i++ {
batcher.Add(i)
time.Sleep(20 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond) // 等待最后一批
batcher.Close()
}
10.4 sync.Once:一次性初始化
var (
instance *ExpensiveResource
once sync.Once
)
func GetResource() *ExpensiveResource {
once.Do(func() {
// 只执行一次,即使有 1000 个 goroutine 同时调用
instance = loadExpensiveResource()
})
return instance
}
sync.Once 的保证:
- 只执行一次(即使并发调用)
- 所有调用者都会等到
Do执行完毕才返回(happens-before 语义) - 如果
Do里 panic 了,Once仍然被标记为已完成——后续调用不会重试
Go 1.21+ 新增 sync.OnceFunc、sync.OnceValue、sync.OnceValues:
// 更简洁的写法
getConfig := sync.OnceValue(func() *Config {
return loadConfig()
})
cfg := getConfig() // 第一次调用时执行 loadConfig,后续直接返回缓存值
11. 测试并发代码
11.1 用 -race + -count 抓 flaky test
# 跑 1000 次,race detector 开启
go test -race -count=1000 -timeout=30m ./pkg/cache/...
# 如果偶发失败,说明有 race condition
原理: race condition 的触发取决于调度时序。多次运行增加触发概率。
11.2 GOMAXPROCS 测试
# 单核——暴露依赖并行性的 bug
GOMAXPROCS=1 go test -race ./...
# 多核——暴露真正的并发 race
GOMAXPROCS=8 go test -race ./...
11.3 t.Parallel() 正确使用
func TestConcurrentAccess(t *testing.T) {
tests := []struct {
name string
input int
want int
}{
{"case1", 1, 2},
{"case2", 2, 4},
{"case3", 3, 6},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel() // 标记为可并行执行
// Go 1.22+ 不需要 tt := tt
got := double(tt.input)
if got != tt.want {
t.Errorf("double(%d) = %d, want %d", tt.input, got, tt.want)
}
})
}
}
func double(n int) int { return n * 2 }
注意事项:
t.Parallel()让子测试并行执行,但外层TestXxx函数会在所有并行子测试完成后才返回- 并行子测试共享的外部状态必须是并发安全的
- 在 Go 1.22 之前,parallel table-driven test 必须
tt := tt捕获循环变量
11.4 确定性测试:注入同步点
// 通过 channel 控制 goroutine 执行顺序,让并发测试变得可预测
func TestProducerConsumer(t *testing.T) {
produced := make(chan struct{})
consumed := make(chan struct{})
var value int
// producer
go func() {
value = 42
close(produced) // 通知:已生产
<-consumed // 等待:已消费
}()
// consumer
<-produced // 等待:已生产
if value != 42 {
t.Errorf("expected 42, got %d", value)
}
close(consumed) // 通知:已消费
}
12. Production Checklist
□ 每个 go 关键字都有对应的生命周期管理(WaitGroup/errgroup/context)
□ 每个 context.WithCancel/WithTimeout 返回的 cancel 都被 defer 调用
□ 并发 goroutine 有上限(errgroup.SetLimit / worker pool / semaphore)
□ 所有 select 语句包含 ctx.Done() 分支(除非确认不需要取消)
□ CI 中 go test -race ./... 每次提交都跑
□ go vet ./... 作为 CI 必检项
□ HTTP server 实现了 graceful shutdown(signal → Shutdown(ctx))
□ 关闭顺序正确:入口 → worker → 底层资源
□ 容器部署:Go < 1.25 使用 automaxprocs;Go 1.25+ 已内置
□ 不在持锁时做 I/O
□ 不用 time.Sleep 做同步
□ 不用 context.Value 传递函数依赖
□ 不在 init() 中启动 goroutine
□ map 的并发访问都有保护(mutex / sync.Map / sharding)