跳转到正文
zeno's blog
返回

Go Redis:go-redis/v9 的连接池、Pipeline 与 Hook

Table of contents

Open Table of contents

TL;DR

github.com/redis/go-redis/v9 是 Redis 官方维护的 Go 客户端(截止 2026-04 最新 v9.18.0),提供类型安全的命令接口、channel-based 连接池、Pipeline/Transaction 批处理、Pub/Sub、Lua 脚本、Hook 中间件链、以及对 Standalone/Cluster/Sentinel/Ring 四种部署模式的统一抽象。掌握它的连接池调优、Pipeline 正确使用、redis.Nil 错误处理和 Context 超时行为是生产可靠性的关键。


1. 它是什么,解决什么问题

为什么需要 Go Redis 客户端库

Redis 使用 RESP (REdis Serialization Protocol) 协议通信。理论上可以直接用 net.Conn 拼 RESP 报文,但实际需要:

历史演变

阶段import path说明
v1-v7github.com/go-redis/redis社区维护
v8github.com/go-redis/redis/v8引入 context.Context 作为所有命令的第一参数
v9github.com/redis/go-redis/v9迁移到 Redis 官方 org,RESP3 支持,默认 Protocol 3

v8 -> v9 的 import path 变化(go-redis/redis -> redis/go-redis)反映了项目从社区库升级为官方客户端的定位变化。


2. 核心架构

2.1 命令执行流水线

一个 rdb.Get(ctx, "key") 调用经历以下路径:

用户调用 Get(ctx, "key")
  → cmdable 接口方法,构造 StringCmd{args: ["GET", "key"]}
    → Process(ctx, cmd) — 进入 Hook 中间件链
      → ProcessHook chain (tracing/metrics/logging hooks)
        → baseClient.process(ctx, cmd) — 重试循环(MaxRetries 次)
          → _process(ctx, cmd, attempt)
            → getConn(ctx) — 从连接池获取连接
              → Limiter.Allow() (如有限流器)
              → connPool.Get(ctx) — channel-based semaphore 获取 turn
                → popIdle() 取空闲连接 或 newConn() 创建新连接
                → initConn() 首次初始化(HELLO/AUTH/SELECT/CLIENT SETNAME)
            → cn.WithWriter() — 序列化命令到 RESP 协议写入连接
            → cn.WithReader() — 从连接读取响应,反序列化到 Cmd.val
            → releaseConn(ctx, cn, err) — 归还连接或标记移除
              → isBadConn() 判断连接是否可用
              → connPool.Put() 或 connPool.Remove()
  → cmd.Result() — 返回 (val, err)

2.2 Cmder 接口与类型安全命令

所有 Redis 命令的响应都被封装为实现 Cmder 接口的具体类型:

// Cmder 是所有命令的基础接口
type Cmder interface {
    Name() string           // 命令名,如 "get"
    FullName() string       // 完整名,如 "cluster info"
    Args() []interface{}    // 所有参数
    String() string         // 格式化的请求+响应
    Err() error             // 响应错误
    // ...
}

常用的具体命令类型(每种对应 Redis 不同返回值类型):

命令类型返回方法对应 Redis 返回示例命令
*StatusCmd.Val() stringSimple StringSET, OK
*StringCmd.Val() string / .Bytes() / .Int() / .Float64()Bulk StringGET, HGET
*IntCmd.Val() int64IntegerINCR, DEL, SCARD
*BoolCmd.Val() boolInteger (0/1)SISMEMBER, EXPIRE
*FloatCmd.Val() float64Bulk String (parsed)INCRBYFLOAT
*SliceCmd.Val() []interface{}ArrayMGET (heterogeneous)
*StringSliceCmd.Val() []stringArray of Bulk StringKEYS, SMEMBERS
*MapStringStringCmd.Val() map[string]stringArray (pairs)HGETALL
*DurationCmd.Val() time.DurationInteger (ms/s)TTL, PTTL
*BoolSliceCmd.Val() []boolArray of IntegerSCRIPT EXISTS
*ZSliceCmd.Val() []ZArray (score+member)ZRANGEWITHSCORES
*ScanCmd.Val() ([]string, uint64)Array + cursorSCAN, HSCAN
*Cmd.Val() interface{}Any (通用)DO

每种 Cmd 都提供两种取值方式:

// 方式 1: Result() 返回 (val, err)
val, err := rdb.Get(ctx, "key").Result()

// 方式 2: 分开取
cmd := rdb.Get(ctx, "key")
err := cmd.Err()
val := cmd.Val()  // err != nil 时返回零值

2.3 Context 支持

v9 所有命令的第一个参数都是 context.Context。但有一个关键配置:

// Options.ContextTimeoutEnabled 控制 context deadline 是否应用到网络 I/O
// 默认 false — context timeout 不影响读写 deadline
// 设为 true — context 的 deadline 会传递给 net.Conn.SetReadDeadline/SetWriteDeadline
ContextTimeoutEnabled bool

默认行为:即使 context 被 cancel,go-redis 内部的 WithWriter/WithReader 使用的是 ReadTimeout/WriteTimeout,不会提前中断网络 I/O。这是有意为之的设计 — 见后文 Pitfalls 节。


3. 客户端类型体系

3.1 五种客户端

UniversalClient (interface)
├── *Client           — 单节点(Standalone)
├── *ClusterClient    — Redis Cluster(自动 slot 路由)
├── *Ring             — 客户端侧分片(Rendezvous Hash)
└── *FailoverClient   — Sentinel HA(通过 NewFailoverClient 创建,底层是 *Client)
    └── *FailoverClusterClient — Sentinel + 读写分离

3.2 选型决策

部署模式客户端构造函数适用场景
单节点*ClientNewClient(&Options{})开发、小规模生产
Redis Cluster*ClusterClientNewClusterClient(&ClusterOptions{})水平扩展,自动 slot 路由
Sentinel HA*FailoverClientNewFailoverClient(&FailoverOptions{})高可用,自动 master 故障转移
Sentinel + 读写分离*FailoverClusterClientNewFailoverClusterClient(&FailoverOptions{RouteByLatency: true})HA + 读扩展
客户端分片*RingNewRing(&RingOptions{Addrs: map})不用 Cluster 但需分散负载
单连接*Connclient.Conn()需要绑定连接的场景(如 CLIENT SETNAME + 后续操作)
抽象接口UniversalClientNewUniversalClient(&UniversalOptions{})代码不关心后端部署形态

3.3 UniversalClient 自动选择逻辑

func NewUniversalClient(opts *UniversalOptions) UniversalClient {
    switch {
    case opts.MasterName != "" && (opts.RouteByLatency || opts.RouteRandomly):
        return NewFailoverClusterClient(opts.Failover())  // Sentinel + 读写分离
    case opts.MasterName != "":
        return NewFailoverClient(opts.Failover())          // Sentinel
    case len(opts.Addrs) > 1 || opts.IsClusterMode:
        return NewClusterClient(opts.Cluster())            // Cluster
    default:
        return NewClient(opts.Simple())                    // 单节点
    }
}

生产建议:业务代码用 UniversalClient 接口编程,通过配置决定后端类型。这样同一份代码可以在本地单节点和生产集群之间切换。


4. 连接池深度解析

4.1 核心参数

go-redis 的连接池基于 channel-based semaphoreFastSemaphore),用 buffered channel 作为令牌桶控制并发。

rdb := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",

    // --- 池大小 ---
    PoolSize:       10 * runtime.GOMAXPROCS(0), // 默认值:CPU 核数 * 10
    MinIdleConns:   5,                          // 最小空闲连接数(后台预创建)
    MaxIdleConns:   0,                          // 最大空闲连接数(0=不限制)
    MaxActiveConns: 0,                          // 最大活跃连接数(0=不限制,超出 PoolSize 的连接不受池管理)

    // --- 超时 ---
    PoolTimeout:     4 * time.Second,           // 默认 ReadTimeout + 1s,等待空闲连接的超时
    ConnMaxIdleTime: 30 * time.Minute,          // 默认 30min,连接最大空闲时间
    ConnMaxLifetime: 0,                         // 默认 0(不限制),连接最大存活时间
    ConnMaxLifetimeJitter: 0,                   // 默认 0,存活时间抖动(防雷群效应)

    // --- 池行为 ---
    PoolFIFO: false,                            // 默认 LIFO,true 改为 FIFO(有助于回收空闲连接)
})
参数默认值说明
PoolSize10 * GOMAXPROCSsemaphore 容量,控制并发获取连接的 goroutine 数
MinIdleConns0>0 时后台预创建连接,降低首次请求延迟
MaxIdleConns0空闲连接上限,0 表示不限制
MaxActiveConns0总连接上限(含使用中),0 表示无硬限制
PoolTimeoutReadTimeout + 1s在 semaphore 上等待 token 的超时
ConnMaxIdleTime30min超过此时间的空闲连接在取出时被惰性关闭
ConnMaxLifetime0 (永不过期)连接最大存活时间,防止连接泄漏或代理断连
ConnMaxLifetimeJitter0在 lifetime 基础上加随机抖动,防止所有连接同时过期
PoolFIFOfalse (LIFO)LIFO 复用最近归还的连接(热连接),FIFO 有助于均匀淘汰

4.2 连接生命周期

                ┌─────────┐
                │ CREATED │ ← dialConn() 创建 TCP 连接
                └────┬────┘
                     │ initConn() 触发

             ┌───────────────┐
             │ INITIALIZING  │ ← HELLO/AUTH/SELECT/CLIENT SETNAME
             └───────┬───────┘
                     │ 成功

              ┌──────────┐ ←──────────────────── Put() 归还
              │   IDLE   │                        │
              └────┬─────┘                        │
    TryAcquire()│  popIdle()                      │
                ▼                                 │
             ┌────────┐ ── 命令执行完毕 ──────────┘
             │ IN_USE │
             └────┬───┘
                  │ handoff/reauth

            ┌───────────┐
            │ UNUSABLE  │ → 重新初始化 → IDLE 或 CLOSED
            └───────────┘

健康检查:连接从池中取出时进行惰性检查:

  1. 检查 ConnMaxIdleTime — 空闲太久则关闭
  2. 检查 ConnMaxLifetime — 超出存活时间则关闭
  3. Pool Hook 验证(如 maintenance notifications)

4.3 内部 Semaphore 机制

连接池不是用 sync.Pool,而是自研的 FastSemaphore

// 基于 buffered channel 的令牌桶
type FastSemaphore struct {
    tokens chan struct{}  // 预填充 PoolSize 个 token
    max    int32
}

4.4 PoolStats 监控

stats := rdb.PoolStats()

// 关键指标
stats.Hits       // 从池中命中空闲连接的次数
stats.Misses     // 池中无空闲连接需要新建的次数
stats.Timeouts   // 等待连接超时次数 — 这个飙升说明池太小
stats.TotalConns // 当前池中总连接数
stats.IdleConns  // 当前空闲连接数
stats.StaleConns // 因过期被清理的连接数

生产告警规则Timeouts 持续 > 0 说明连接池不够用,需增大 PoolSize 或排查慢查询。


5. Pipeline 与 Transaction

5.1 Pipeline — 批量无事务

Pipeline 将多条命令打包成一次网络往返,但不保证原子性

// github.com/redis/go-redis/v9 v9.18.0
pipe := rdb.Pipeline()
incr := pipe.Incr(ctx, "counter")
expire := pipe.Expire(ctx, "counter", time.Hour)
get := pipe.Get(ctx, "other-key")

cmds, err := pipe.Exec(ctx) // 一次 RTT 发送 3 条命令
// err 是第一个失败命令的错误

// 每个命令的结果独立读取
fmt.Println(incr.Val())   // 命令各自有结果
fmt.Println(get.Val())
fmt.Println(get.Err())    // 可能是 redis.Nil

便捷写法:

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    pipe.Incr(ctx, "counter")
    pipe.Expire(ctx, "counter", time.Hour)
    return nil
})

5.2 TxPipeline — MULTI/EXEC 事务

TxPipeline 在 Pipeline 外层包装 MULTI/EXEC,保证命令原子执行:

cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    pipe.Incr(ctx, "counter")
    pipe.Expire(ctx, "counter", time.Hour)
    return nil
})
// 实际发送的是: MULTI → INCR → EXPIRE → EXEC

内部实现(源码 tx.go):

func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
    cmdsCopy := make([]Cmder, len(cmds)+2)
    cmdsCopy[0] = NewStatusCmd(ctx, "multi")
    copy(cmdsCopy[1:], cmds)
    cmdsCopy[len(cmdsCopy)-1] = NewSliceCmd(ctx, "exec")
    return cmdsCopy
}

5.3 Watch + TxPipeline — 乐观锁

// 实现 WATCH + MULTI/EXEC 的 CAS 模式
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
    val, err := tx.Get(ctx, "key").Int64()
    if err != nil && err != redis.Nil {
        return err
    }

    // 业务逻辑
    newVal := val + 1

    // 如果 key 在 WATCH 之后被修改,EXEC 返回 TxFailedErr
    _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
        pipe.Set(ctx, "key", newVal, 0)
        return nil
    })
    return err
}, "key") // 监视的 key 列表

if err == redis.TxFailedErr {
    // key 被其他客户端修改,需重试
}

5.4 Pipeline 错误处理

核心区别Exec() 返回的 err 是第一个失败命令的错误,但其他命令可能已成功执行。必须逐一检查每个命令的错误:

cmds, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
    // 有命令失败,但不能只看这个 err
}

for _, cmd := range cmds {
    if cmd.Err() != nil {
        // 逐一处理每个命令的错误
    }
}

6. Pub/Sub

6.1 API 概览

// 订阅
pubsub := rdb.Subscribe(ctx, "channel1", "channel2")
defer pubsub.Close()

// 模式订阅
pubsub := rdb.PSubscribe(ctx, "news:*")

// Shard 订阅(Redis Cluster 7.0+)
pubsub := rdb.SSubscribe(ctx, "shard-channel")

6.2 接收消息的三种方式

// 方式 1: Channel() — 推荐,返回 <-chan *Message
ch := pubsub.Channel(
    redis.WithChannelSize(100),                        // 缓冲区大小,默认 100
    redis.WithChannelHealthCheckInterval(3*time.Second), // 健康检查间隔,默认 3s
    redis.WithChannelSendTimeout(time.Minute),         // 发送超时,默认 60s
)
for msg := range ch {
    fmt.Println(msg.Channel, msg.Payload)
}

// 方式 2: ReceiveMessage() — 阻塞等待,忽略 Subscription/Pong
msg, err := pubsub.ReceiveMessage(ctx)

// 方式 3: Receive() — 低级 API,返回 Subscription/Message/Pong
iface, err := pubsub.Receive(ctx)
switch v := iface.(type) {
case *redis.Subscription:
    // 订阅确认
case *redis.Message:
    // 消息
case *redis.Pong:
    // 健康检查响应
}

6.3 自动重连机制

PubSub 在连接断开时自动重连并重新订阅所有 channel/pattern。内部维护三个 map 记录订阅状态:

type PubSub struct {
    channels  map[string]struct{}  // 普通频道
    patterns  map[string]struct{}  // 模式订阅
    schannels map[string]struct{}  // Shard 频道
    // ...
}

重连时调用 resubscribe() 逐一重新发送 SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE 命令。

6.4 ChannelWithSubscriptions

如果需要感知重连事件(区分首次订阅和重连后的重新订阅),使用 ChannelWithSubscriptions()

ch := pubsub.ChannelWithSubscriptions()
for iface := range ch {
    switch v := iface.(type) {
    case *redis.Subscription:
        // 每次 (重)订阅都会收到
    case *redis.Message:
        // 消息
    }
}

注意:Channel()ChannelWithSubscriptions() 互斥,同一个 PubSub 只能调用一个,否则 panic。


7. Lua 脚本

7.1 Script 对象

// NewScript 在客户端计算 SHA1
var incrByScript = redis.NewScript(`
    local current = redis.call("GET", KEYS[1])
    if current == false then current = 0 end
    current = tonumber(current) + tonumber(ARGV[1])
    redis.call("SET", KEYS[1], current)
    return current
`)

// Run() 先尝试 EVALSHA,NOSCRIPT 时自动 fallback 到 EVAL
result, err := incrByScript.Run(ctx, rdb, []string{"my-counter"}, 10).Int64()

7.2 执行流程

Script.Run() -> EvalSha() -> 如果返回 NOSCRIPT -> Eval()

NewScript() 在构造时就计算好 SHA1(纯 Go 计算,不需要 Redis):

func NewScript(src string) *Script {
    h := sha1.New()
    io.WriteString(h, src)
    return &Script{
        src:  src,
        hash: hex.EncodeToString(h.Sum(nil)),
    }
}

7.3 NewScriptServerSHA — 服务端哈希

v9.18+ 新增 NewScriptServerSHA(),让 Redis 服务端通过 SCRIPT LOAD 计算并返回 SHA,避免客户端计算与服务端不一致的极端情况:

var script = redis.NewScriptServerSHA(`return redis.call("GET", KEYS[1])`)
// 首次执行时自动 SCRIPT LOAD 获取服务端 SHA
result, err := script.Run(ctx, rdb, []string{"key"}).Result()

7.4 Cluster 模式下的 Lua 约束

Cluster 模式下所有 KEYS 必须在同一个 slot。使用 hash tag 确保:

// 正确:{user:1} 是 hash tag,两个 key 在同一 slot
script.Run(ctx, rdb, []string{"{user:1}:name", "{user:1}:age"}, ...)

// 错误:不同 slot 会报 CROSSSLOT 错误
script.Run(ctx, rdb, []string{"user:1:name", "user:2:age"}, ...)

8. Hook 中间件体系

8.1 Hook 接口

type Hook interface {
    DialHook(next DialHook) DialHook
    ProcessHook(next ProcessHook) ProcessHook
    ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}

type DialHook            func(ctx context.Context, network, addr string) (net.Conn, error)
type ProcessHook         func(ctx context.Context, cmd Cmder) error
type ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error

Hook 是洋葱模型(FIFO 入栈,逆序执行):

AddHook(hook1, hook2)
执行顺序: hook1.before → hook2.before → Redis 命令 → hook2.after → hook1.after

8.2 自定义 Hook 示例

type loggingHook struct{}

func (h loggingHook) DialHook(next redis.DialHook) redis.DialHook {
    return func(ctx context.Context, network, addr string) (net.Conn, error) {
        log.Printf("dialing %s %s", network, addr)
        conn, err := next(ctx, network, addr)
        log.Printf("dial result: err=%v", err)
        return conn, err
    }
}

func (h loggingHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
    return func(ctx context.Context, cmd redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmd)  // *** 必须调用 next,否则命令不会执行 ***
        log.Printf("redis: %s %s (dur=%s, err=%v)",
            cmd.FullName(), cmd.String(), time.Since(start), err)
        return err
    }
}

func (h loggingHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
    return func(ctx context.Context, cmds []redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmds)
        log.Printf("redis: pipeline %d cmds (dur=%s)", len(cmds), time.Since(start))
        return err
    }
}

// 注册
rdb.AddHook(loggingHook{})

8.3 OpenTelemetry 集成

go-redis 提供两个 OTel 集成包:

extra/redisotel — 基于 Hook 的 tracing + metrics(传统方案):

import "github.com/redis/go-redis/extra/redisotel/v9"

// Tracing — 为每个 Redis 命令创建 span
if err := redisotel.InstrumentTracing(rdb); err != nil {
    panic(err)
}

// Metrics — 上报连接池指标
if err := redisotel.InstrumentMetrics(rdb); err != nil {
    panic(err)
}

extra/redisotel-native — v9.18+ 新增,基于 Bridge Pattern,核心库零依赖 OTel SDK,通过 SetOTelRecorder 注入:

// 核心库定义 OTelRecorder 接口
type OTelRecorder interface {
    RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn ConnInfo, dbIndex int)
    RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn ConnInfo)
    RecordError(ctx context.Context, errorType string, cn ConnInfo, statusCode string, isInternal bool, retryAttempts int)
    // ... 更多指标方法
}

// redisotel-native 包实现该接口
redis.SetOTelRecorder(nativeRecorder)

Native 方案的优势:不依赖 Hook 机制,直接在命令执行内部埋点,开销更低且覆盖更多指标(如 per-retry duration、connection wait time、maintenance notification)。


9. 完整代码示例

9.1 基础 CRUD

// github.com/redis/go-redis/v9 v9.18.x, Go 1.21+
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()

    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })
    defer rdb.Close()

    // SET with expiration
    err := rdb.Set(ctx, "name", "Alice", 10*time.Minute).Err()
    if err != nil {
        panic(err)
    }

    // GET — 注意 redis.Nil 检查
    val, err := rdb.Get(ctx, "name").Result()
    if errors.Is(err, redis.Nil) {
        fmt.Println("key does not exist")
    } else if err != nil {
        panic(err)
    } else {
        fmt.Println("name:", val) // name: Alice
    }

    // HSET / HGETALL
    rdb.HSet(ctx, "user:1", "name", "Bob", "age", 30)
    fields, _ := rdb.HGetAll(ctx, "user:1").Result()
    fmt.Println(fields) // map[age:30 name:Bob]

    // DELETE
    deleted, _ := rdb.Del(ctx, "name", "user:1").Result()
    fmt.Println("deleted:", deleted)
}

9.2 Pipeline 批量操作

func pipelineExample(ctx context.Context, rdb *redis.Client) error {
    cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
        for i := 0; i < 100; i++ {
            pipe.Set(ctx, fmt.Sprintf("key:%d", i), i, time.Hour)
        }
        return nil
    })
    if err != nil {
        return fmt.Errorf("pipeline exec: %w", err)
    }

    // 逐一检查每个命令的结果
    for _, cmd := range cmds {
        if cmd.Err() != nil {
            fmt.Printf("command %s failed: %v\n", cmd.FullName(), cmd.Err())
        }
    }
    return nil
}

9.3 Pub/Sub 消费者

func subscriber(ctx context.Context, rdb *redis.Client) {
    pubsub := rdb.Subscribe(ctx, "events")
    defer pubsub.Close()

    // 等待订阅确认
    _, err := pubsub.Receive(ctx)
    if err != nil {
        panic(err)
    }

    ch := pubsub.Channel()
    for msg := range ch {
        fmt.Printf("[%s] %s\n", msg.Channel, msg.Payload)
        // Channel() 在 PubSub.Close() 时自动关闭
    }
}

9.4 Lua 脚本

var rateLimit = redis.NewScript(`
    local key = KEYS[1]
    local limit = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])

    local current = tonumber(redis.call("GET", key) or "0")
    if current >= limit then
        return 0
    end

    current = redis.call("INCR", key)
    if current == 1 then
        redis.call("EXPIRE", key, window)
    end
    return 1
`)

func checkRateLimit(ctx context.Context, rdb *redis.Client, userID string) (bool, error) {
    result, err := rateLimit.Run(ctx, rdb,
        []string{fmt.Sprintf("ratelimit:%s", userID)},
        100,  // limit
        60,   // window seconds
    ).Int64()
    if err != nil {
        return false, err
    }
    return result == 1, nil
}

9.5 Cluster Client

func clusterExample() {
    rdb := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{
            "localhost:7000", "localhost:7001", "localhost:7002",
        },
        ReadOnly:       true,  // 允许从 replica 读
        RouteByLatency: true,  // 读请求路由到最低延迟节点

        // 每个节点的连接池配置
        PoolSize:     15,
        MinIdleConns: 5,

        MaxRedirects: 8,       // MOVED/ASK 最大重定向次数,默认 3
    })
    defer rdb.Close()

    ctx := context.Background()

    // 使用 hash tag 确保多 key 操作在同一 slot
    rdb.Set(ctx, "{user:1}:name", "Alice", 0)
    rdb.Set(ctx, "{user:1}:age", "30", 0)

    // Pipeline 自动按 slot 分组发送
    pipe := rdb.Pipeline()
    pipe.Get(ctx, "key-on-slot-1")
    pipe.Get(ctx, "key-on-slot-2")
    pipe.Exec(ctx) // 内部按 slot 分组,可能多次 RTT

    // 遍历所有分片
    rdb.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
        return client.Ping(ctx).Err()
    })
}

9.6 Universal Client

func newRedisClient(cfg Config) redis.UniversalClient {
    return redis.NewUniversalClient(&redis.UniversalOptions{
        Addrs:      cfg.Addrs,       // 单节点: ["localhost:6379"], Cluster: ["n1:7000", "n2:7001"]
        MasterName: cfg.MasterName,  // 非空则走 Sentinel
        DB:         cfg.DB,
        Password:   cfg.Password,

        PoolSize:        cfg.PoolSize,
        MinIdleConns:    cfg.MinIdleConns,
        ConnMaxIdleTime: cfg.ConnMaxIdleTime,

        MaxRetries: 3,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
    })
}

10. 与其他 Go Redis 客户端对比

维度go-redis/v9redigo (gomodule)radix (mediocregopher)
维护方Redis 官方社区,低活跃社区
API 风格类型安全,每个命令返回具体 Cmd 类型Do("GET", key) 返回 interface{}类型安全但 API 不同
连接池内置,channel-based semaphore内置,Pool struct内置
Cluster 支持内置,自动 slot 路由无,需第三方库内置
Sentinel 支持内置需第三方库内置
Pipeline内置,类型安全手动 Send/Receive内置
Pub/Sub内置,自动重连内置,手动管理内置
Hook/中间件完整的 Hook 接口
Context全面支持有限支持支持
RESP3默认 Protocol 3仅 RESP2RESP2
OTel官方 redisotel 包无官方无官方
Go 最低版本1.21(v9.18+需要1.24)1.141.16
Stars (approx)~20k~10k~1k

选型结论:新项目没有理由不用 go-redis/v9。redigo 是上一代标准,适合存量项目维护。radix 在特定性能场景下有优势但生态薄。


11. Pitfalls(常见陷阱)

11.1 不检查 redis.Nil 导致逻辑错误

redis.Nil 不是 error,是正常的 “key 不存在”语义。但它满足 err != nil

// 错误写法:把 "key 不存在" 当成真正的错误
val, err := rdb.Get(ctx, "maybe-not-exist").Result()
if err != nil {
    return err // 会把 redis.Nil 也当错误返回
}

// 正确写法
val, err := rdb.Get(ctx, "maybe-not-exist").Result()
if errors.Is(err, redis.Nil) {
    // key 不存在,业务逻辑处理
    val = "default"
} else if err != nil {
    return fmt.Errorf("redis get: %w", err) // 真正的错误
}

11.2 Context cancellation 导致连接中毒(v9 重要行为)

默认情况下ContextTimeoutEnabled: false),context cancel 不会中断正在进行的网络 I/O。这意味着即使上层 ctx 被 cancel,命令仍然会正常执行完毕。

// ContextTimeoutEnabled: false (默认)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
rdb.BLPop(ctx, 5*time.Second, "queue") // context 超时不影响 BLPop,仍按 ReadTimeout 执行

如果设置 ContextTimeoutEnabled: true,context 的 deadline 会传递给 net.Conn.SetReadDeadline。此时 cancel context 会导致正在读写的连接收到 I/O 错误,go-redis 会将该连接标记为 bad connection 并从池中移除。这可以防止连接中毒(半写状态),但代价是连接被丢弃。

建议:大多数场景保持默认(false),依赖 ReadTimeout/WriteTimeout 控制超时。只在需要精确 per-request timeout 且能承受连接丢弃时才开启。

11.3 Pipeline 错误只看 Exec 返回值

// 危险:只检查 Exec 的错误
_, err := pipe.Exec(ctx)
if err != nil {
    // 这只是第一个失败命令的错误
    // 其他命令可能已经成功执行
}

// 正确:逐一检查
cmds, err := pipe.Exec(ctx)
for _, cmd := range cmds {
    if cmd.Err() != nil && !errors.Is(cmd.Err(), redis.Nil) {
        log.Printf("cmd %s failed: %v", cmd.FullName(), cmd.Err())
    }
}

11.4 连接池耗尽但无告警

PoolSize 默认是 10 * runtime.GOMAXPROCS(0),在 8 核机器上是 80。如果有慢查询(如 KEYS *)或阻塞命令(BLPOP),连接会被长时间占用,导致其他 goroutine 在 semaphore 上等待直到 PoolTimeout

应对方案

  1. 监控 PoolStats().Timeouts — 非零就告警
  2. 阻塞命令使用独立 Client(独立连接池),不与普通命令共享
  3. 设置合理的 MaxActiveConns 作为硬上限

11.5 Pub/Sub 连接不走连接池

PubSub 使用独立连接(通过 PubSubPool 管理),不占用主连接池。但 PubSub 连接有以下特性:

应对:如果不能丢消息,用 Redis Streams 替代 Pub/Sub。

11.6 Cluster 模式下的 CROSSSLOT 错误

在 Redis Cluster 中,多 key 操作(MGET、Pipeline 中的事务、Lua 脚本)要求所有 key 在同一个 slot。

// 错误:不同 slot
rdb.MGet(ctx, "user:1", "user:2")  // 可能 CROSSSLOT

// 正确:用 hash tag
rdb.MGet(ctx, "{user}:1", "{user}:2")  // 同一 slot

注意:ClusterClient.Pipeline() 的非事务 Pipeline 会自动按 slot 分组发送(多次 RTT),但 TxPipeline() 要求所有命令的 key 在同一 slot,否则直接报错。

11.7 大 Value 导致超时

Redis 单线程模型意味着一个大 value 的读写会阻塞所有其他命令。100MB 的 value 在千兆网络上传输需要 ~1 秒。

// 对大 value 操作,使用独立 client 并增大超时
bigClient := rdb.WithTimeout(30 * time.Second)
val, err := bigClient.Get(ctx, "huge-key").Bytes()

11.8 Sentinel 故障转移后连接到旧 master

Sentinel 客户端会订阅 Sentinel 的 +switch-master 事件自动切换。但 go-redis 的行为是:

这意味着故障转移后的第一批请求可能失败。确保 MaxRetries >= 1(默认 3)来自动重试。


12. 生产环境最佳实践

12.1 连接池调优 Checklist

rdb := redis.NewClient(&redis.Options{
    Addr: "redis.prod:6379",

    // 超时 — 三件套必配
    DialTimeout:  5 * time.Second,
    ReadTimeout:  3 * time.Second,
    WriteTimeout: 3 * time.Second,

    // 连接池 — 根据并发量调整
    PoolSize:        50,                // 估算:峰值并发 goroutine 数的 80%
    MinIdleConns:    10,                // 预热连接,降低冷启动延迟
    MaxActiveConns:  100,               // 硬上限,防止 Redis max clients
    PoolTimeout:     4 * time.Second,   // 略大于 ReadTimeout
    ConnMaxIdleTime: 5 * time.Minute,   // 小于 Redis 的 timeout 配置
    ConnMaxLifetime: 30 * time.Minute,  // 防止连接被中间代理断开
    ConnMaxLifetimeJitter: 5 * time.Minute, // 防止连接同时过期

    // 重试
    MaxRetries:      3,
    MinRetryBackoff: 8 * time.Millisecond,    // 指数退避下限
    MaxRetryBackoff: 512 * time.Millisecond,  // 指数退避上限
})

12.2 健康检查

// 启动时验证连接
if err := rdb.Ping(ctx).Err(); err != nil {
    log.Fatalf("redis unavailable: %v", err)
}

// 定期检查连接池状态
go func() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        stats := rdb.PoolStats()
        metrics.Gauge("redis.pool.total_conns", float64(stats.TotalConns))
        metrics.Gauge("redis.pool.idle_conns", float64(stats.IdleConns))
        metrics.Counter("redis.pool.timeouts", float64(stats.Timeouts))
        metrics.Counter("redis.pool.hits", float64(stats.Hits))
        metrics.Counter("redis.pool.misses", float64(stats.Misses))
    }
}()

12.3 Graceful Shutdown

// Close 会等待所有正在执行的命令完成,然后关闭所有连接
// 源码注释:"It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines."
func shutdown(rdb *redis.Client) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // 先停止接收新请求(应用层面)
    // ...

    // 关闭 Redis client
    if err := rdb.Close(); err != nil {
        log.Printf("redis close error: %v", err)
    }
}

12.4 错误处理模式

func getUser(ctx context.Context, rdb redis.UniversalClient, id string) (*User, error) {
    val, err := rdb.Get(ctx, "user:"+id).Result()

    switch {
    case errors.Is(err, redis.Nil):
        return nil, nil // key 不存在,不是错误
    case err != nil:
        return nil, fmt.Errorf("redis get user %s: %w", id, err) // 真正的错误
    }

    var user User
    if err := json.Unmarshal([]byte(val), &user); err != nil {
        return nil, fmt.Errorf("unmarshal user %s: %w", id, err)
    }
    return &user, nil
}

12.5 Limiter 接口 — 熔断器/限流器

type Limiter interface {
    Allow() error                 // 是否允许执行
    ReportResult(result error)    // 报告执行结果
}

可以接入 sony/gobreaker 等熔断器库,在 Redis 不可用时快速失败:

rdb := redis.NewClient(&redis.Options{
    Addr:    "localhost:6379",
    Limiter: NewCircuitBreakerLimiter(), // 自定义实现
})

13. 使用 miniredis 做单元测试

github.com/alicebob/miniredis/v2 是一个纯 Go 实现的 Redis 服务器,运行在内存中,无需真实 Redis 实例。

// go get github.com/alicebob/miniredis/v2
package cache_test

import (
    "context"
    "testing"
    "time"

    "github.com/alicebob/miniredis/v2"
    "github.com/redis/go-redis/v9"
)

func TestCacheSet(t *testing.T) {
    // 启动 miniredis(无需真实 Redis)
    mr := miniredis.RunT(t) // t.Cleanup 自动关闭

    rdb := redis.NewClient(&redis.Options{
        Addr: mr.Addr(),
    })
    defer rdb.Close()

    ctx := context.Background()

    // 正常的 Redis 操作
    err := rdb.Set(ctx, "key", "value", time.Minute).Err()
    if err != nil {
        t.Fatal(err)
    }

    val, err := rdb.Get(ctx, "key").Result()
    if err != nil {
        t.Fatal(err)
    }
    if val != "value" {
        t.Fatalf("expected 'value', got %q", val)
    }

    // miniredis 支持时间快进
    mr.FastForward(2 * time.Minute)

    // key 已过期
    _, err = rdb.Get(ctx, "key").Result()
    if err != redis.Nil {
        t.Fatalf("expected redis.Nil, got %v", err)
    }
}

func TestWithLuaScript(t *testing.T) {
    mr := miniredis.RunT(t)
    rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
    defer rdb.Close()

    ctx := context.Background()

    script := redis.NewScript(`return redis.call("SET", KEYS[1], ARGV[1])`)
    err := script.Run(ctx, rdb, []string{"lua-key"}, "lua-value").Err()
    if err != nil {
        t.Fatal(err)
    }

    // 直接在 miniredis 上验证
    got, err := mr.Get("lua-key")
    if err != nil {
        t.Fatal(err)
    }
    if got != "lua-value" {
        t.Fatalf("expected 'lua-value', got %q", got)
    }
}

miniredis 的局限:

替代方案:集成测试用 Docker 真实 Redis + testcontainers-go。


14. v8 -> v9 迁移关键变化

变化v8v9
import pathgithub.com/go-redis/redis/v8github.com/redis/go-redis/v9
默认协议RESP2RESP3Protocol: 3
Hook 接口BeforeProcess/AfterProcess 回调洋葱模型 DialHook/ProcessHook/ProcessPipelineHook,必须显式调用 next
连接池参数IdleTimeout / MaxConnAge重命名为 ConnMaxIdleTime / ConnMaxLifetime(语义更明确)
MinIdleConns类型 int类型 int(内部转 int32)
新增 MaxActiveConns总连接硬上限
新增 PoolFIFO支持 FIFO 连接池策略
ContextTimeoutEnabled无(context timeout 总是生效)默认 false,需显式开启
Scan 相关分散在各类型统一的 Scanner 接口
错误类型字符串匹配支持 errors.Is/errors.As
PushNotificationRESP3 Push Notification 支持
Maintenance Notificationsv9.18+ 支持 Cluster 维护通知(SMIGRATING/SMIGRATED)

迁移步骤:

  1. 修改 import path
  2. 搜索替换 IdleTimeout -> ConnMaxIdleTimeMaxConnAge -> ConnMaxLifetime
  3. 重写 Hook 实现为新的洋葱模型
  4. 如果依赖 context timeout 控制命令超时,设置 ContextTimeoutEnabled: true
  5. 测试 RESP3 兼容性(某些代理可能不支持)

分享这篇文章:

上一篇
C++ STL:迭代器如何解耦算法与容器
下一篇
Go PostgreSQL:pgx 的架构设计与生产实践