Table of contents
Open Table of contents
TL;DR
github.com/redis/go-redis/v9 是 Redis 官方维护的 Go 客户端(截止 2026-04 最新 v9.18.0),提供类型安全的命令接口、channel-based 连接池、Pipeline/Transaction 批处理、Pub/Sub、Lua 脚本、Hook 中间件链、以及对 Standalone/Cluster/Sentinel/Ring 四种部署模式的统一抽象。掌握它的连接池调优、Pipeline 正确使用、redis.Nil 错误处理和 Context 超时行为是生产可靠性的关键。
1. 它是什么,解决什么问题
为什么需要 Go Redis 客户端库
Redis 使用 RESP (REdis Serialization Protocol) 协议通信。理论上可以直接用 net.Conn 拼 RESP 报文,但实际需要:
- 连接池 — Redis 是短连接模型效率极低(TCP 握手 + AUTH + SELECT),需要长连接复用
- 类型安全 —
GET返回 string,INCR返回 int64,HGETALL返回 map,需要编译期类型约束 - 重试与容错 — 网络抖动、Cluster MOVED/ASK 重定向、Sentinel 故障转移,客户端必须自动处理
- 批处理 — Pipeline 和 Transaction 的正确协议封装
- 可观测性 — 需要 Hook 点注入 tracing、metrics、logging
历史演变
| 阶段 | import path | 说明 |
|---|---|---|
| v1-v7 | github.com/go-redis/redis | 社区维护 |
| v8 | github.com/go-redis/redis/v8 | 引入 context.Context 作为所有命令的第一参数 |
| v9 | github.com/redis/go-redis/v9 | 迁移到 Redis 官方 org,RESP3 支持,默认 Protocol 3 |
v8 -> v9 的 import path 变化(go-redis/redis -> redis/go-redis)反映了项目从社区库升级为官方客户端的定位变化。
2. 核心架构
2.1 命令执行流水线
一个 rdb.Get(ctx, "key") 调用经历以下路径:
用户调用 Get(ctx, "key")
→ cmdable 接口方法,构造 StringCmd{args: ["GET", "key"]}
→ Process(ctx, cmd) — 进入 Hook 中间件链
→ ProcessHook chain (tracing/metrics/logging hooks)
→ baseClient.process(ctx, cmd) — 重试循环(MaxRetries 次)
→ _process(ctx, cmd, attempt)
→ getConn(ctx) — 从连接池获取连接
→ Limiter.Allow() (如有限流器)
→ connPool.Get(ctx) — channel-based semaphore 获取 turn
→ popIdle() 取空闲连接 或 newConn() 创建新连接
→ initConn() 首次初始化(HELLO/AUTH/SELECT/CLIENT SETNAME)
→ cn.WithWriter() — 序列化命令到 RESP 协议写入连接
→ cn.WithReader() — 从连接读取响应,反序列化到 Cmd.val
→ releaseConn(ctx, cn, err) — 归还连接或标记移除
→ isBadConn() 判断连接是否可用
→ connPool.Put() 或 connPool.Remove()
→ cmd.Result() — 返回 (val, err)
2.2 Cmder 接口与类型安全命令
所有 Redis 命令的响应都被封装为实现 Cmder 接口的具体类型:
// Cmder 是所有命令的基础接口
type Cmder interface {
Name() string // 命令名,如 "get"
FullName() string // 完整名,如 "cluster info"
Args() []interface{} // 所有参数
String() string // 格式化的请求+响应
Err() error // 响应错误
// ...
}
常用的具体命令类型(每种对应 Redis 不同返回值类型):
| 命令类型 | 返回方法 | 对应 Redis 返回 | 示例命令 |
|---|---|---|---|
*StatusCmd | .Val() string | Simple String | SET, OK |
*StringCmd | .Val() string / .Bytes() / .Int() / .Float64() | Bulk String | GET, HGET |
*IntCmd | .Val() int64 | Integer | INCR, DEL, SCARD |
*BoolCmd | .Val() bool | Integer (0/1) | SISMEMBER, EXPIRE |
*FloatCmd | .Val() float64 | Bulk String (parsed) | INCRBYFLOAT |
*SliceCmd | .Val() []interface{} | Array | MGET (heterogeneous) |
*StringSliceCmd | .Val() []string | Array of Bulk String | KEYS, SMEMBERS |
*MapStringStringCmd | .Val() map[string]string | Array (pairs) | HGETALL |
*DurationCmd | .Val() time.Duration | Integer (ms/s) | TTL, PTTL |
*BoolSliceCmd | .Val() []bool | Array of Integer | SCRIPT EXISTS |
*ZSliceCmd | .Val() []Z | Array (score+member) | ZRANGEWITHSCORES |
*ScanCmd | .Val() ([]string, uint64) | Array + cursor | SCAN, HSCAN |
*Cmd | .Val() interface{} | Any (通用) | DO |
每种 Cmd 都提供两种取值方式:
// 方式 1: Result() 返回 (val, err)
val, err := rdb.Get(ctx, "key").Result()
// 方式 2: 分开取
cmd := rdb.Get(ctx, "key")
err := cmd.Err()
val := cmd.Val() // err != nil 时返回零值
2.3 Context 支持
v9 所有命令的第一个参数都是 context.Context。但有一个关键配置:
// Options.ContextTimeoutEnabled 控制 context deadline 是否应用到网络 I/O
// 默认 false — context timeout 不影响读写 deadline
// 设为 true — context 的 deadline 会传递给 net.Conn.SetReadDeadline/SetWriteDeadline
ContextTimeoutEnabled bool
默认行为:即使 context 被 cancel,go-redis 内部的 WithWriter/WithReader 使用的是 ReadTimeout/WriteTimeout,不会提前中断网络 I/O。这是有意为之的设计 — 见后文 Pitfalls 节。
3. 客户端类型体系
3.1 五种客户端
UniversalClient (interface)
├── *Client — 单节点(Standalone)
├── *ClusterClient — Redis Cluster(自动 slot 路由)
├── *Ring — 客户端侧分片(Rendezvous Hash)
└── *FailoverClient — Sentinel HA(通过 NewFailoverClient 创建,底层是 *Client)
└── *FailoverClusterClient — Sentinel + 读写分离
3.2 选型决策
| 部署模式 | 客户端 | 构造函数 | 适用场景 |
|---|---|---|---|
| 单节点 | *Client | NewClient(&Options{}) | 开发、小规模生产 |
| Redis Cluster | *ClusterClient | NewClusterClient(&ClusterOptions{}) | 水平扩展,自动 slot 路由 |
| Sentinel HA | *FailoverClient | NewFailoverClient(&FailoverOptions{}) | 高可用,自动 master 故障转移 |
| Sentinel + 读写分离 | *FailoverClusterClient | NewFailoverClusterClient(&FailoverOptions{RouteByLatency: true}) | HA + 读扩展 |
| 客户端分片 | *Ring | NewRing(&RingOptions{Addrs: map}) | 不用 Cluster 但需分散负载 |
| 单连接 | *Conn | client.Conn() | 需要绑定连接的场景(如 CLIENT SETNAME + 后续操作) |
| 抽象接口 | UniversalClient | NewUniversalClient(&UniversalOptions{}) | 代码不关心后端部署形态 |
3.3 UniversalClient 自动选择逻辑
func NewUniversalClient(opts *UniversalOptions) UniversalClient {
switch {
case opts.MasterName != "" && (opts.RouteByLatency || opts.RouteRandomly):
return NewFailoverClusterClient(opts.Failover()) // Sentinel + 读写分离
case opts.MasterName != "":
return NewFailoverClient(opts.Failover()) // Sentinel
case len(opts.Addrs) > 1 || opts.IsClusterMode:
return NewClusterClient(opts.Cluster()) // Cluster
default:
return NewClient(opts.Simple()) // 单节点
}
}
生产建议:业务代码用 UniversalClient 接口编程,通过配置决定后端类型。这样同一份代码可以在本地单节点和生产集群之间切换。
4. 连接池深度解析
4.1 核心参数
go-redis 的连接池基于 channel-based semaphore(FastSemaphore),用 buffered channel 作为令牌桶控制并发。
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
// --- 池大小 ---
PoolSize: 10 * runtime.GOMAXPROCS(0), // 默认值:CPU 核数 * 10
MinIdleConns: 5, // 最小空闲连接数(后台预创建)
MaxIdleConns: 0, // 最大空闲连接数(0=不限制)
MaxActiveConns: 0, // 最大活跃连接数(0=不限制,超出 PoolSize 的连接不受池管理)
// --- 超时 ---
PoolTimeout: 4 * time.Second, // 默认 ReadTimeout + 1s,等待空闲连接的超时
ConnMaxIdleTime: 30 * time.Minute, // 默认 30min,连接最大空闲时间
ConnMaxLifetime: 0, // 默认 0(不限制),连接最大存活时间
ConnMaxLifetimeJitter: 0, // 默认 0,存活时间抖动(防雷群效应)
// --- 池行为 ---
PoolFIFO: false, // 默认 LIFO,true 改为 FIFO(有助于回收空闲连接)
})
| 参数 | 默认值 | 说明 |
|---|---|---|
PoolSize | 10 * GOMAXPROCS | semaphore 容量,控制并发获取连接的 goroutine 数 |
MinIdleConns | 0 | >0 时后台预创建连接,降低首次请求延迟 |
MaxIdleConns | 0 | 空闲连接上限,0 表示不限制 |
MaxActiveConns | 0 | 总连接上限(含使用中),0 表示无硬限制 |
PoolTimeout | ReadTimeout + 1s | 在 semaphore 上等待 token 的超时 |
ConnMaxIdleTime | 30min | 超过此时间的空闲连接在取出时被惰性关闭 |
ConnMaxLifetime | 0 (永不过期) | 连接最大存活时间,防止连接泄漏或代理断连 |
ConnMaxLifetimeJitter | 0 | 在 lifetime 基础上加随机抖动,防止所有连接同时过期 |
PoolFIFO | false (LIFO) | LIFO 复用最近归还的连接(热连接),FIFO 有助于均匀淘汰 |
4.2 连接生命周期
┌─────────┐
│ CREATED │ ← dialConn() 创建 TCP 连接
└────┬────┘
│ initConn() 触发
▼
┌───────────────┐
│ INITIALIZING │ ← HELLO/AUTH/SELECT/CLIENT SETNAME
└───────┬───────┘
│ 成功
▼
┌──────────┐ ←──────────────────── Put() 归还
│ IDLE │ │
└────┬─────┘ │
TryAcquire()│ popIdle() │
▼ │
┌────────┐ ── 命令执行完毕 ──────────┘
│ IN_USE │
└────┬───┘
│ handoff/reauth
▼
┌───────────┐
│ UNUSABLE │ → 重新初始化 → IDLE 或 CLOSED
└───────────┘
健康检查:连接从池中取出时进行惰性检查:
- 检查
ConnMaxIdleTime— 空闲太久则关闭 - 检查
ConnMaxLifetime— 超出存活时间则关闭 - Pool Hook 验证(如 maintenance notifications)
4.3 内部 Semaphore 机制
连接池不是用 sync.Pool,而是自研的 FastSemaphore:
// 基于 buffered channel 的令牌桶
type FastSemaphore struct {
tokens chan struct{} // 预填充 PoolSize 个 token
max int32
}
- 快路径:
TryAcquire()用select default非阻塞尝试取 token,~30ns/op - 慢路径:
Acquire()带 timer 等待,超时返回ErrPoolTimeout - 释放连接时
Release()往 channel 发送 token
4.4 PoolStats 监控
stats := rdb.PoolStats()
// 关键指标
stats.Hits // 从池中命中空闲连接的次数
stats.Misses // 池中无空闲连接需要新建的次数
stats.Timeouts // 等待连接超时次数 — 这个飙升说明池太小
stats.TotalConns // 当前池中总连接数
stats.IdleConns // 当前空闲连接数
stats.StaleConns // 因过期被清理的连接数
生产告警规则:Timeouts 持续 > 0 说明连接池不够用,需增大 PoolSize 或排查慢查询。
5. Pipeline 与 Transaction
5.1 Pipeline — 批量无事务
Pipeline 将多条命令打包成一次网络往返,但不保证原子性:
// github.com/redis/go-redis/v9 v9.18.0
pipe := rdb.Pipeline()
incr := pipe.Incr(ctx, "counter")
expire := pipe.Expire(ctx, "counter", time.Hour)
get := pipe.Get(ctx, "other-key")
cmds, err := pipe.Exec(ctx) // 一次 RTT 发送 3 条命令
// err 是第一个失败命令的错误
// 每个命令的结果独立读取
fmt.Println(incr.Val()) // 命令各自有结果
fmt.Println(get.Val())
fmt.Println(get.Err()) // 可能是 redis.Nil
便捷写法:
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Incr(ctx, "counter")
pipe.Expire(ctx, "counter", time.Hour)
return nil
})
5.2 TxPipeline — MULTI/EXEC 事务
TxPipeline 在 Pipeline 外层包装 MULTI/EXEC,保证命令原子执行:
cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Incr(ctx, "counter")
pipe.Expire(ctx, "counter", time.Hour)
return nil
})
// 实际发送的是: MULTI → INCR → EXPIRE → EXEC
内部实现(源码 tx.go):
func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
cmdsCopy := make([]Cmder, len(cmds)+2)
cmdsCopy[0] = NewStatusCmd(ctx, "multi")
copy(cmdsCopy[1:], cmds)
cmdsCopy[len(cmdsCopy)-1] = NewSliceCmd(ctx, "exec")
return cmdsCopy
}
5.3 Watch + TxPipeline — 乐观锁
// 实现 WATCH + MULTI/EXEC 的 CAS 模式
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
val, err := tx.Get(ctx, "key").Int64()
if err != nil && err != redis.Nil {
return err
}
// 业务逻辑
newVal := val + 1
// 如果 key 在 WATCH 之后被修改,EXEC 返回 TxFailedErr
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "key", newVal, 0)
return nil
})
return err
}, "key") // 监视的 key 列表
if err == redis.TxFailedErr {
// key 被其他客户端修改,需重试
}
5.4 Pipeline 错误处理
核心区别:Exec() 返回的 err 是第一个失败命令的错误,但其他命令可能已成功执行。必须逐一检查每个命令的错误:
cmds, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
// 有命令失败,但不能只看这个 err
}
for _, cmd := range cmds {
if cmd.Err() != nil {
// 逐一处理每个命令的错误
}
}
6. Pub/Sub
6.1 API 概览
// 订阅
pubsub := rdb.Subscribe(ctx, "channel1", "channel2")
defer pubsub.Close()
// 模式订阅
pubsub := rdb.PSubscribe(ctx, "news:*")
// Shard 订阅(Redis Cluster 7.0+)
pubsub := rdb.SSubscribe(ctx, "shard-channel")
6.2 接收消息的三种方式
// 方式 1: Channel() — 推荐,返回 <-chan *Message
ch := pubsub.Channel(
redis.WithChannelSize(100), // 缓冲区大小,默认 100
redis.WithChannelHealthCheckInterval(3*time.Second), // 健康检查间隔,默认 3s
redis.WithChannelSendTimeout(time.Minute), // 发送超时,默认 60s
)
for msg := range ch {
fmt.Println(msg.Channel, msg.Payload)
}
// 方式 2: ReceiveMessage() — 阻塞等待,忽略 Subscription/Pong
msg, err := pubsub.ReceiveMessage(ctx)
// 方式 3: Receive() — 低级 API,返回 Subscription/Message/Pong
iface, err := pubsub.Receive(ctx)
switch v := iface.(type) {
case *redis.Subscription:
// 订阅确认
case *redis.Message:
// 消息
case *redis.Pong:
// 健康检查响应
}
6.3 自动重连机制
PubSub 在连接断开时自动重连并重新订阅所有 channel/pattern。内部维护三个 map 记录订阅状态:
type PubSub struct {
channels map[string]struct{} // 普通频道
patterns map[string]struct{} // 模式订阅
schannels map[string]struct{} // Shard 频道
// ...
}
重连时调用 resubscribe() 逐一重新发送 SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE 命令。
6.4 ChannelWithSubscriptions
如果需要感知重连事件(区分首次订阅和重连后的重新订阅),使用 ChannelWithSubscriptions():
ch := pubsub.ChannelWithSubscriptions()
for iface := range ch {
switch v := iface.(type) {
case *redis.Subscription:
// 每次 (重)订阅都会收到
case *redis.Message:
// 消息
}
}
注意:Channel() 和 ChannelWithSubscriptions() 互斥,同一个 PubSub 只能调用一个,否则 panic。
7. Lua 脚本
7.1 Script 对象
// NewScript 在客户端计算 SHA1
var incrByScript = redis.NewScript(`
local current = redis.call("GET", KEYS[1])
if current == false then current = 0 end
current = tonumber(current) + tonumber(ARGV[1])
redis.call("SET", KEYS[1], current)
return current
`)
// Run() 先尝试 EVALSHA,NOSCRIPT 时自动 fallback 到 EVAL
result, err := incrByScript.Run(ctx, rdb, []string{"my-counter"}, 10).Int64()
7.2 执行流程
Script.Run() -> EvalSha() -> 如果返回 NOSCRIPT -> Eval()
NewScript() 在构造时就计算好 SHA1(纯 Go 计算,不需要 Redis):
func NewScript(src string) *Script {
h := sha1.New()
io.WriteString(h, src)
return &Script{
src: src,
hash: hex.EncodeToString(h.Sum(nil)),
}
}
7.3 NewScriptServerSHA — 服务端哈希
v9.18+ 新增 NewScriptServerSHA(),让 Redis 服务端通过 SCRIPT LOAD 计算并返回 SHA,避免客户端计算与服务端不一致的极端情况:
var script = redis.NewScriptServerSHA(`return redis.call("GET", KEYS[1])`)
// 首次执行时自动 SCRIPT LOAD 获取服务端 SHA
result, err := script.Run(ctx, rdb, []string{"key"}).Result()
7.4 Cluster 模式下的 Lua 约束
Cluster 模式下所有 KEYS 必须在同一个 slot。使用 hash tag 确保:
// 正确:{user:1} 是 hash tag,两个 key 在同一 slot
script.Run(ctx, rdb, []string{"{user:1}:name", "{user:1}:age"}, ...)
// 错误:不同 slot 会报 CROSSSLOT 错误
script.Run(ctx, rdb, []string{"user:1:name", "user:2:age"}, ...)
8. Hook 中间件体系
8.1 Hook 接口
type Hook interface {
DialHook(next DialHook) DialHook
ProcessHook(next ProcessHook) ProcessHook
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}
type DialHook func(ctx context.Context, network, addr string) (net.Conn, error)
type ProcessHook func(ctx context.Context, cmd Cmder) error
type ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
Hook 是洋葱模型(FIFO 入栈,逆序执行):
AddHook(hook1, hook2)
执行顺序: hook1.before → hook2.before → Redis 命令 → hook2.after → hook1.after
8.2 自定义 Hook 示例
type loggingHook struct{}
func (h loggingHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
log.Printf("dialing %s %s", network, addr)
conn, err := next(ctx, network, addr)
log.Printf("dial result: err=%v", err)
return conn, err
}
}
func (h loggingHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
start := time.Now()
err := next(ctx, cmd) // *** 必须调用 next,否则命令不会执行 ***
log.Printf("redis: %s %s (dur=%s, err=%v)",
cmd.FullName(), cmd.String(), time.Since(start), err)
return err
}
}
func (h loggingHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
start := time.Now()
err := next(ctx, cmds)
log.Printf("redis: pipeline %d cmds (dur=%s)", len(cmds), time.Since(start))
return err
}
}
// 注册
rdb.AddHook(loggingHook{})
8.3 OpenTelemetry 集成
go-redis 提供两个 OTel 集成包:
extra/redisotel — 基于 Hook 的 tracing + metrics(传统方案):
import "github.com/redis/go-redis/extra/redisotel/v9"
// Tracing — 为每个 Redis 命令创建 span
if err := redisotel.InstrumentTracing(rdb); err != nil {
panic(err)
}
// Metrics — 上报连接池指标
if err := redisotel.InstrumentMetrics(rdb); err != nil {
panic(err)
}
extra/redisotel-native — v9.18+ 新增,基于 Bridge Pattern,核心库零依赖 OTel SDK,通过 SetOTelRecorder 注入:
// 核心库定义 OTelRecorder 接口
type OTelRecorder interface {
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn ConnInfo, dbIndex int)
RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn ConnInfo)
RecordError(ctx context.Context, errorType string, cn ConnInfo, statusCode string, isInternal bool, retryAttempts int)
// ... 更多指标方法
}
// redisotel-native 包实现该接口
redis.SetOTelRecorder(nativeRecorder)
Native 方案的优势:不依赖 Hook 机制,直接在命令执行内部埋点,开销更低且覆盖更多指标(如 per-retry duration、connection wait time、maintenance notification)。
9. 完整代码示例
9.1 基础 CRUD
// github.com/redis/go-redis/v9 v9.18.x, Go 1.21+
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
defer rdb.Close()
// SET with expiration
err := rdb.Set(ctx, "name", "Alice", 10*time.Minute).Err()
if err != nil {
panic(err)
}
// GET — 注意 redis.Nil 检查
val, err := rdb.Get(ctx, "name").Result()
if errors.Is(err, redis.Nil) {
fmt.Println("key does not exist")
} else if err != nil {
panic(err)
} else {
fmt.Println("name:", val) // name: Alice
}
// HSET / HGETALL
rdb.HSet(ctx, "user:1", "name", "Bob", "age", 30)
fields, _ := rdb.HGetAll(ctx, "user:1").Result()
fmt.Println(fields) // map[age:30 name:Bob]
// DELETE
deleted, _ := rdb.Del(ctx, "name", "user:1").Result()
fmt.Println("deleted:", deleted)
}
9.2 Pipeline 批量操作
func pipelineExample(ctx context.Context, rdb *redis.Client) error {
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), i, time.Hour)
}
return nil
})
if err != nil {
return fmt.Errorf("pipeline exec: %w", err)
}
// 逐一检查每个命令的结果
for _, cmd := range cmds {
if cmd.Err() != nil {
fmt.Printf("command %s failed: %v\n", cmd.FullName(), cmd.Err())
}
}
return nil
}
9.3 Pub/Sub 消费者
func subscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "events")
defer pubsub.Close()
// 等待订阅确认
_, err := pubsub.Receive(ctx)
if err != nil {
panic(err)
}
ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("[%s] %s\n", msg.Channel, msg.Payload)
// Channel() 在 PubSub.Close() 时自动关闭
}
}
9.4 Lua 脚本
var rateLimit = redis.NewScript(`
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call("GET", key) or "0")
if current >= limit then
return 0
end
current = redis.call("INCR", key)
if current == 1 then
redis.call("EXPIRE", key, window)
end
return 1
`)
func checkRateLimit(ctx context.Context, rdb *redis.Client, userID string) (bool, error) {
result, err := rateLimit.Run(ctx, rdb,
[]string{fmt.Sprintf("ratelimit:%s", userID)},
100, // limit
60, // window seconds
).Int64()
if err != nil {
return false, err
}
return result == 1, nil
}
9.5 Cluster Client
func clusterExample() {
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"localhost:7000", "localhost:7001", "localhost:7002",
},
ReadOnly: true, // 允许从 replica 读
RouteByLatency: true, // 读请求路由到最低延迟节点
// 每个节点的连接池配置
PoolSize: 15,
MinIdleConns: 5,
MaxRedirects: 8, // MOVED/ASK 最大重定向次数,默认 3
})
defer rdb.Close()
ctx := context.Background()
// 使用 hash tag 确保多 key 操作在同一 slot
rdb.Set(ctx, "{user:1}:name", "Alice", 0)
rdb.Set(ctx, "{user:1}:age", "30", 0)
// Pipeline 自动按 slot 分组发送
pipe := rdb.Pipeline()
pipe.Get(ctx, "key-on-slot-1")
pipe.Get(ctx, "key-on-slot-2")
pipe.Exec(ctx) // 内部按 slot 分组,可能多次 RTT
// 遍历所有分片
rdb.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
return client.Ping(ctx).Err()
})
}
9.6 Universal Client
func newRedisClient(cfg Config) redis.UniversalClient {
return redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: cfg.Addrs, // 单节点: ["localhost:6379"], Cluster: ["n1:7000", "n2:7001"]
MasterName: cfg.MasterName, // 非空则走 Sentinel
DB: cfg.DB,
Password: cfg.Password,
PoolSize: cfg.PoolSize,
MinIdleConns: cfg.MinIdleConns,
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
MaxRetries: 3,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
}
10. 与其他 Go Redis 客户端对比
| 维度 | go-redis/v9 | redigo (gomodule) | radix (mediocregopher) |
|---|---|---|---|
| 维护方 | Redis 官方 | 社区,低活跃 | 社区 |
| API 风格 | 类型安全,每个命令返回具体 Cmd 类型 | Do("GET", key) 返回 interface{} | 类型安全但 API 不同 |
| 连接池 | 内置,channel-based semaphore | 内置,Pool struct | 内置 |
| Cluster 支持 | 内置,自动 slot 路由 | 无,需第三方库 | 内置 |
| Sentinel 支持 | 内置 | 需第三方库 | 内置 |
| Pipeline | 内置,类型安全 | 手动 Send/Receive | 内置 |
| Pub/Sub | 内置,自动重连 | 内置,手动管理 | 内置 |
| Hook/中间件 | 完整的 Hook 接口 | 无 | 无 |
| Context | 全面支持 | 有限支持 | 支持 |
| RESP3 | 默认 Protocol 3 | 仅 RESP2 | RESP2 |
| OTel | 官方 redisotel 包 | 无官方 | 无官方 |
| Go 最低版本 | 1.21(v9.18+需要1.24) | 1.14 | 1.16 |
| Stars (approx) | ~20k | ~10k | ~1k |
选型结论:新项目没有理由不用 go-redis/v9。redigo 是上一代标准,适合存量项目维护。radix 在特定性能场景下有优势但生态薄。
11. Pitfalls(常见陷阱)
11.1 不检查 redis.Nil 导致逻辑错误
redis.Nil 不是 error,是正常的 “key 不存在”语义。但它满足 err != nil。
// 错误写法:把 "key 不存在" 当成真正的错误
val, err := rdb.Get(ctx, "maybe-not-exist").Result()
if err != nil {
return err // 会把 redis.Nil 也当错误返回
}
// 正确写法
val, err := rdb.Get(ctx, "maybe-not-exist").Result()
if errors.Is(err, redis.Nil) {
// key 不存在,业务逻辑处理
val = "default"
} else if err != nil {
return fmt.Errorf("redis get: %w", err) // 真正的错误
}
11.2 Context cancellation 导致连接中毒(v9 重要行为)
默认情况下(ContextTimeoutEnabled: false),context cancel 不会中断正在进行的网络 I/O。这意味着即使上层 ctx 被 cancel,命令仍然会正常执行完毕。
// ContextTimeoutEnabled: false (默认)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
rdb.BLPop(ctx, 5*time.Second, "queue") // context 超时不影响 BLPop,仍按 ReadTimeout 执行
如果设置 ContextTimeoutEnabled: true,context 的 deadline 会传递给 net.Conn.SetReadDeadline。此时 cancel context 会导致正在读写的连接收到 I/O 错误,go-redis 会将该连接标记为 bad connection 并从池中移除。这可以防止连接中毒(半写状态),但代价是连接被丢弃。
建议:大多数场景保持默认(false),依赖 ReadTimeout/WriteTimeout 控制超时。只在需要精确 per-request timeout 且能承受连接丢弃时才开启。
11.3 Pipeline 错误只看 Exec 返回值
// 危险:只检查 Exec 的错误
_, err := pipe.Exec(ctx)
if err != nil {
// 这只是第一个失败命令的错误
// 其他命令可能已经成功执行
}
// 正确:逐一检查
cmds, err := pipe.Exec(ctx)
for _, cmd := range cmds {
if cmd.Err() != nil && !errors.Is(cmd.Err(), redis.Nil) {
log.Printf("cmd %s failed: %v", cmd.FullName(), cmd.Err())
}
}
11.4 连接池耗尽但无告警
PoolSize 默认是 10 * runtime.GOMAXPROCS(0),在 8 核机器上是 80。如果有慢查询(如 KEYS *)或阻塞命令(BLPOP),连接会被长时间占用,导致其他 goroutine 在 semaphore 上等待直到 PoolTimeout。
应对方案:
- 监控
PoolStats().Timeouts— 非零就告警 - 阻塞命令使用独立 Client(独立连接池),不与普通命令共享
- 设置合理的
MaxActiveConns作为硬上限
11.5 Pub/Sub 连接不走连接池
PubSub 使用独立连接(通过 PubSubPool 管理),不占用主连接池。但 PubSub 连接有以下特性:
- 订阅状态下只能执行 SUBSCRIBE/UNSUBSCRIBE/PING
Channel()内部有 goroutine 循环接收,PubSub 未 Close 时会泄漏- 网络断开后自动重连,但重连期间的消息会丢失(Redis Pub/Sub 不持久化)
应对:如果不能丢消息,用 Redis Streams 替代 Pub/Sub。
11.6 Cluster 模式下的 CROSSSLOT 错误
在 Redis Cluster 中,多 key 操作(MGET、Pipeline 中的事务、Lua 脚本)要求所有 key 在同一个 slot。
// 错误:不同 slot
rdb.MGet(ctx, "user:1", "user:2") // 可能 CROSSSLOT
// 正确:用 hash tag
rdb.MGet(ctx, "{user}:1", "{user}:2") // 同一 slot
注意:ClusterClient.Pipeline() 的非事务 Pipeline 会自动按 slot 分组发送(多次 RTT),但 TxPipeline() 要求所有命令的 key 在同一 slot,否则直接报错。
11.7 大 Value 导致超时
Redis 单线程模型意味着一个大 value 的读写会阻塞所有其他命令。100MB 的 value 在千兆网络上传输需要 ~1 秒。
// 对大 value 操作,使用独立 client 并增大超时
bigClient := rdb.WithTimeout(30 * time.Second)
val, err := bigClient.Get(ctx, "huge-key").Bytes()
11.8 Sentinel 故障转移后连接到旧 master
Sentinel 客户端会订阅 Sentinel 的 +switch-master 事件自动切换。但 go-redis 的行为是:
- 已有的空闲连接指向旧 master,下次使用时如果旧 master 变成 replica 且开了
replica-read-only,会收到READONLY错误 - go-redis 看到
READONLY会将连接标记为 bad(isBadConn返回 true),触发重连到新 master
这意味着故障转移后的第一批请求可能失败。确保 MaxRetries >= 1(默认 3)来自动重试。
12. 生产环境最佳实践
12.1 连接池调优 Checklist
rdb := redis.NewClient(&redis.Options{
Addr: "redis.prod:6379",
// 超时 — 三件套必配
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
// 连接池 — 根据并发量调整
PoolSize: 50, // 估算:峰值并发 goroutine 数的 80%
MinIdleConns: 10, // 预热连接,降低冷启动延迟
MaxActiveConns: 100, // 硬上限,防止 Redis max clients
PoolTimeout: 4 * time.Second, // 略大于 ReadTimeout
ConnMaxIdleTime: 5 * time.Minute, // 小于 Redis 的 timeout 配置
ConnMaxLifetime: 30 * time.Minute, // 防止连接被中间代理断开
ConnMaxLifetimeJitter: 5 * time.Minute, // 防止连接同时过期
// 重试
MaxRetries: 3,
MinRetryBackoff: 8 * time.Millisecond, // 指数退避下限
MaxRetryBackoff: 512 * time.Millisecond, // 指数退避上限
})
12.2 健康检查
// 启动时验证连接
if err := rdb.Ping(ctx).Err(); err != nil {
log.Fatalf("redis unavailable: %v", err)
}
// 定期检查连接池状态
go func() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
stats := rdb.PoolStats()
metrics.Gauge("redis.pool.total_conns", float64(stats.TotalConns))
metrics.Gauge("redis.pool.idle_conns", float64(stats.IdleConns))
metrics.Counter("redis.pool.timeouts", float64(stats.Timeouts))
metrics.Counter("redis.pool.hits", float64(stats.Hits))
metrics.Counter("redis.pool.misses", float64(stats.Misses))
}
}()
12.3 Graceful Shutdown
// Close 会等待所有正在执行的命令完成,然后关闭所有连接
// 源码注释:"It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines."
func shutdown(rdb *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 先停止接收新请求(应用层面)
// ...
// 关闭 Redis client
if err := rdb.Close(); err != nil {
log.Printf("redis close error: %v", err)
}
}
12.4 错误处理模式
func getUser(ctx context.Context, rdb redis.UniversalClient, id string) (*User, error) {
val, err := rdb.Get(ctx, "user:"+id).Result()
switch {
case errors.Is(err, redis.Nil):
return nil, nil // key 不存在,不是错误
case err != nil:
return nil, fmt.Errorf("redis get user %s: %w", id, err) // 真正的错误
}
var user User
if err := json.Unmarshal([]byte(val), &user); err != nil {
return nil, fmt.Errorf("unmarshal user %s: %w", id, err)
}
return &user, nil
}
12.5 Limiter 接口 — 熔断器/限流器
type Limiter interface {
Allow() error // 是否允许执行
ReportResult(result error) // 报告执行结果
}
可以接入 sony/gobreaker 等熔断器库,在 Redis 不可用时快速失败:
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Limiter: NewCircuitBreakerLimiter(), // 自定义实现
})
13. 使用 miniredis 做单元测试
github.com/alicebob/miniredis/v2 是一个纯 Go 实现的 Redis 服务器,运行在内存中,无需真实 Redis 实例。
// go get github.com/alicebob/miniredis/v2
package cache_test
import (
"context"
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)
func TestCacheSet(t *testing.T) {
// 启动 miniredis(无需真实 Redis)
mr := miniredis.RunT(t) // t.Cleanup 自动关闭
rdb := redis.NewClient(&redis.Options{
Addr: mr.Addr(),
})
defer rdb.Close()
ctx := context.Background()
// 正常的 Redis 操作
err := rdb.Set(ctx, "key", "value", time.Minute).Err()
if err != nil {
t.Fatal(err)
}
val, err := rdb.Get(ctx, "key").Result()
if err != nil {
t.Fatal(err)
}
if val != "value" {
t.Fatalf("expected 'value', got %q", val)
}
// miniredis 支持时间快进
mr.FastForward(2 * time.Minute)
// key 已过期
_, err = rdb.Get(ctx, "key").Result()
if err != redis.Nil {
t.Fatalf("expected redis.Nil, got %v", err)
}
}
func TestWithLuaScript(t *testing.T) {
mr := miniredis.RunT(t)
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
defer rdb.Close()
ctx := context.Background()
script := redis.NewScript(`return redis.call("SET", KEYS[1], ARGV[1])`)
err := script.Run(ctx, rdb, []string{"lua-key"}, "lua-value").Err()
if err != nil {
t.Fatal(err)
}
// 直接在 miniredis 上验证
got, err := mr.Get("lua-key")
if err != nil {
t.Fatal(err)
}
if got != "lua-value" {
t.Fatalf("expected 'lua-value', got %q", got)
}
}
miniredis 的局限:
- 不支持 Cluster 命令(CLUSTER SLOTS 等)
- 不支持 Pub/Sub 的完整语义(可用
mr.Publish()) - 不支持 MULTI/EXEC 的 WATCH 语义(部分支持)
- 不支持 Module 命令(RediSearch、RedisJSON 等)
- 单线程,无法测试竞态条件
替代方案:集成测试用 Docker 真实 Redis + testcontainers-go。
14. v8 -> v9 迁移关键变化
| 变化 | v8 | v9 |
|---|---|---|
| import path | github.com/go-redis/redis/v8 | github.com/redis/go-redis/v9 |
| 默认协议 | RESP2 | RESP3(Protocol: 3) |
| Hook 接口 | BeforeProcess/AfterProcess 回调 | 洋葱模型 DialHook/ProcessHook/ProcessPipelineHook,必须显式调用 next |
| 连接池参数 | IdleTimeout / MaxConnAge | 重命名为 ConnMaxIdleTime / ConnMaxLifetime(语义更明确) |
| MinIdleConns | 类型 int | 类型 int(内部转 int32) |
| 新增 MaxActiveConns | 无 | 总连接硬上限 |
| 新增 PoolFIFO | 无 | 支持 FIFO 连接池策略 |
| ContextTimeoutEnabled | 无(context timeout 总是生效) | 默认 false,需显式开启 |
| Scan 相关 | 分散在各类型 | 统一的 Scanner 接口 |
| 错误类型 | 字符串匹配 | 支持 errors.Is/errors.As |
| PushNotification | 无 | RESP3 Push Notification 支持 |
| Maintenance Notifications | 无 | v9.18+ 支持 Cluster 维护通知(SMIGRATING/SMIGRATED) |
迁移步骤:
- 修改 import path
- 搜索替换
IdleTimeout->ConnMaxIdleTime,MaxConnAge->ConnMaxLifetime - 重写 Hook 实现为新的洋葱模型
- 如果依赖 context timeout 控制命令超时,设置
ContextTimeoutEnabled: true - 测试 RESP3 兼容性(某些代理可能不支持)