Table of contents
Open Table of contents
TL;DR
github.com/jackc/pgx/v5 是纯 Go 实现的 PostgreSQL 驱动和工具集。它直接实现 PostgreSQL 线协议(不依赖 libpq),提供原生 API 和 database/sql 兼容层双模式,内置连接池(pgxpool)、自动 prepared statement 缓存、COPY 批量导入、LISTEN/NOTIFY、70+ 类型映射。相比 lib/pq,pgx 更快、功能更全、维护更活跃,是 2026 年 Go + PostgreSQL 的唯一正解。
1. pgx 是什么,解决什么问题
历史背景
Go 标准库 database/sql 定义了通用数据库接口,但它是面向「最大公约数」设计的——所有数据库共享一套 API。这导致 PostgreSQL 的高级特性(COPY、LISTEN/NOTIFY、批量操作、二进制格式、自定义类型)全部无法使用。
早期 Go 社区的 PG 驱动是 lib/pq,它实现了 database/sql/driver 接口。但 lib/pq 在 2023 年进入维护模式(README 明确推荐 pgx),且有以下问题:
- 不支持 COPY 协议
- 不支持 LISTEN/NOTIFY(需要额外 fork 出独立连接)
- 没有内置连接池(依赖
database/sql的池) - 类型映射有限,扩展性差
- 性能不如纯 Go 实现的 wire protocol
Jack Christensen 从 v1 开始构建 pgx,到 v5(2022 年发布)完成了重大重构:引入泛型(CollectRows[T]、RowTo[T])、重新设计类型系统(pgtype)、统一查询执行模式(QueryExecMode)。
为什么 pgx 成为事实标准
- 纯 Go 实现 PostgreSQL wire protocol — 不依赖 CGO 或 libpq,交叉编译友好
- 双模式 — 原生 pgx API(高性能、全特性)+
pgx/stdlib(database/sql兼容) - 生态覆盖 — sqlc、GORM、ent 的 PostgreSQL backend 都基于 pgx
- 积极维护 — v5 持续迭代,支持 Go 1.25+ 和 PostgreSQL 14+
- 性能 — 二进制格式编解码、prepared statement 缓存、流式结果读取
双模式设计
┌─────────────────────────────────────────────┐
│ 你的应用代码 │
├──────────────────┬──────────────────────────┤
│ pgx 原生 API │ database/sql 标准接口 │
│ pool.Query() │ db.Query() │
│ pool.CopyFrom() │ (无法使用 COPY) │
│ pool.SendBatch()│ (无法使用 Batch) │
├──────────────────┼──────────────────────────┤
│ pgxpool │ pgx/stdlib 适配层 │
├──────────────────┴──────────────────────────┤
│ pgx.Conn — 高层查询/事务/类型映射 │
├─────────────────────────────────────────────┤
│ pgconn.PgConn — 低层协议连接 │
├─────────────────────────────────────────────┤
│ pgproto3 — wire protocol 编解码 │
├─────────────────────────────────────────────┤
│ TCP / TLS │
└─────────────────────────────────────────────┘
选择建议:如果你的项目只用 PostgreSQL,用原生 pgx API。只有需要与 GORM 等 database/sql 生态库共存时,才用 pgx/stdlib。即使使用 database/sql,也可以通过 (*sql.Conn).Raw() 拿到底层 *pgx.Conn 使用原生特性。
2. 核心架构与分层设计
2.1 分层架构(自底向上)
| 层 | 包 | 职责 | 核心类型 |
|---|---|---|---|
| Wire Protocol | pgproto3 | PostgreSQL v3 协议的消息编解码 | FrontendMessage, BackendMessage |
| 低层连接 | pgconn | 认证、TLS、查询执行、COPY、通知(≈libpq) | PgConn, Config |
| 高层接口 | pgx(根包) | 连接管理、查询、事务、批量、类型映射 | Conn, Rows, Tx, Batch |
| 类型系统 | pgtype | Go ↔ PostgreSQL 类型转换(70+ 类型) | Map, Type, Codec |
| 连接池 | pgxpool | 并发安全的连接池(基于 puddle/v2) | Pool, Config, Stat |
| database/sql | stdlib | database/sql/driver 适配层 | Driver, Conn |
2.2 PostgreSQL Wire Protocol
PostgreSQL 使用 Frontend/Backend 协议(v3),基于 TCP 的消息交换。pgx 通过 pgproto3 包直接编解码这些消息。
连接启动流程:
Client (Frontend) Server (Backend)
│ │
│── StartupMessage ────────────────→ │ (发送协议版本、用户名、数据库)
│ │
│← AuthenticationRequest ────────── │ (MD5/SCRAM-SHA-256/trust)
│── PasswordMessage ───────────────→ │
│ │
│← AuthenticationOk ─────────────── │
│← ParameterStatus (多条) ────────── │ (server_version, encoding, etc.)
│← BackendKeyData ──────────────── │ (PID, 用于取消查询)
│← ReadyForQuery ──────────────── │ (事务状态: I=idle, T=in tx, E=error)
两种查询协议:
| Simple Query Protocol | Extended Query Protocol | |
|---|---|---|
| 消息 | 单条 Query 消息 | Parse → Bind → Describe → Execute |
| 参数 | 内联到 SQL 字符串(客户端转义) | 独立传递,服务端绑定 |
| Prepared Statement | 不支持 | 支持(Parse 创建,后续复用) |
| 二进制格式 | 不支持 | 支持(结果和参数都可以二进制) |
| SQL 注入风险 | 取决于客户端转义实现 | 天然安全(参数与 SQL 分离) |
| 适用场景 | 无参查询、PgBouncer 兼容 | 默认模式、高性能场景 |
pgx 默认使用 Extended Query Protocol 并自动缓存 prepared statement。
2.3 pgconn.PgConn — 低层协议连接
pgconn.PgConn 是对 PostgreSQL 物理连接的 Go 封装,功能大致对应 C 的 libpq。它处理:
- TCP/TLS 连接建立和认证
- 查询执行(
Exec,ExecParams,ExecStatement) - COPY 协议(
CopyFrom,CopyTo) - 通知(
WaitForNotification) - 连接取消(通过 BackendKeyData)
- Pipeline 模式
一般不直接使用,但可以通过 conn.PgConn() 获取以访问低层功能。
2.4 pgx.Conn — 高层连接
// Conn 不是并发安全的。生产代码中不要直接使用 Conn,用 pgxpool.Pool。
type Conn struct {
pgConn *pgconn.PgConn // 底层协议连接
config *ConnConfig // 连接配置
preparedStatements map[string]*pgconn.StatementDescription // 手动 Prepare 的语句
statementCache stmtcache.Cache // 自动缓存(CacheStatement 模式)
descriptionCache stmtcache.Cache // 描述缓存(CacheDescribe 模式)
typeMap *pgtype.Map // 类型注册表
queryTracer QueryTracer // 追踪器
notifications []*pgconn.Notification // 缓冲的通知
// ...
}
关键点:pgx.Conn 不是 goroutine 安全的。这是使用 pgxpool.Pool 的根本原因。
3. pgxpool 连接池深度解析
3.1 为什么需要连接池
pgx.Conn不是 goroutine 安全的 — 并发访问会导致数据竞争和协议错乱- 每次新建连接有 TCP 握手 + TLS 协商 + PostgreSQL 认证开销(~数毫秒到数十毫秒)
- PostgreSQL 每个连接是一个独立进程(~5-10MB),连接数过多会导致性能急剧下降
pgxpool 基于 github.com/jackc/puddle/v2(一个通用资源池库)实现。
3.2 Pool 配置详解
config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
// ParseConfig 支持在连接字符串中指定池参数:
// postgres://user:pass@host:5432/db?pool_max_conns=20&pool_min_conns=5
// 也可以代码中修改:
config.MaxConns = 20 // 最大连接数,默认 max(4, runtime.NumCPU())
config.MinConns = 5 // 最小连接数,默认 0(纯懒创建)
config.MinIdleConns = 3 // 最小空闲连接数(v5 新增,减少尾延迟)
config.MaxConnLifetime = 1 * time.Hour // 连接最大存活时间,默认 1h
config.MaxConnLifetimeJitter = 5 * time.Minute // 存活时间抖动,防止同时过期
config.MaxConnIdleTime = 30 * time.Minute // 空闲超时,默认 30min
config.HealthCheckPeriod = 1 * time.Minute // 健康检查间隔,默认 1min
config.PingTimeout = 5 * time.Second // Ping 超时,默认 0(无超时)
参数选择指南:
| 参数 | 生产建议 | 理由 |
|---|---|---|
MaxConns | PG max_connections / 应用实例数,通常 10-30 | 过多会压垮 PG;过少会池等待 |
MinConns | MaxConns / 4 到 MaxConns / 2 | 避免冷启动延迟 |
MinIdleConns | 2-5 | 减少尾延迟,比 MinConns 更精准 |
MaxConnLifetime | 30min-1h | 回收长期连接,防止 PG 内存泄漏 |
MaxConnLifetimeJitter | MaxConnLifetime * 10% | 避免雪崩式断连 |
MaxConnIdleTime | 5-15min | 空闲连接尽早释放给其他实例 |
HealthCheckPeriod | 30s-1min | 及时发现死连接 |
3.3 连接生命周期
┌──────────────────────────────────────────┐
│ pgxpool.Pool │
│ │
pool.Query() │ Acquire() ──→ 有空闲连接? ──Y──→ ShouldPing?│
─────────────→ │ │ │ │
│ N Ping │
│ │ │ │
│ 连接数 < MaxConns? 成功? │
│ │ │ │
│ Y N: Destroy, Y │
│ │ 重试 Acquire │ │
│ 创建新连接 │ │
│ BeforeConnect() ─→ pgx.Connect │ │
│ ─→ AfterConnect() │ │
│ │ │ │
│ └──→ PrepareConn() ────────→ 返回连接 │
│ │ │
│ ← ← ← 执行查询 ← ← ← ← ← ← ┘ │
│ │
│ Release() ─→ conn 状态正常? ──Y──→ 过期? │
│ │ │ │ │
│ N N: Destroy N │
│ │ │ │
│ Destroy() AfterRelease() ──→ 归还池│
│ triggerHealthCheck() │
└──────────────────────────────────────────┘
关键设计决策:
- 懒创建 — 默认
MinConns=0,连接按需创建。设置MinConns > 0会在NewWithConfig时异步预创建 - 空闲 Ping — 默认空闲超过 1 秒的连接在 Acquire 时会被 Ping。通过
ShouldPing自定义 - Release 时校验 — 归还连接时检查:是否已关闭、是否有未完成操作(
IsBusy)、是否不在 idle 事务状态(TxStatus != 'I')。不满足条件直接 Destroy - ConnectTimeout — 池创建连接时如果
ConnectTimeout <= 0,自动设为 2 分钟上限,防止 Acquire 被取消后连接创建在后台挂住
3.4 生命周期钩子
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
// 在建立连接前修改配置(每个连接独立副本)
// 用例:动态密码轮换、添加连接属性
cfg.RuntimeParams["application_name"] = "my-service-" + hostname
return nil
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
// 连接建立后、加入池前。用于一次性初始化。
// 用例:注册自定义类型、设置 search_path、加载扩展
pgxUUID.Register(conn.TypeMap())
// 注册自定义 enum
dt, err := conn.LoadType(ctx, "my_enum_type")
if err != nil {
return err
}
conn.TypeMap().RegisterType(dt)
return nil
}
config.PrepareConn = func(ctx context.Context, conn *pgx.Conn) (bool, error) {
// 每次 Acquire 连接时调用(替代已废弃的 BeforeAcquire)
// 返回 (true, nil): 连接有效,继续使用
// 返回 (false, nil): 连接无效,销毁并重试新连接
// 返回 (true, err): 连接有效但查询应该失败
// 返回 (false, err): 连接无效且查询失败
return true, nil
}
config.AfterRelease = func(conn *pgx.Conn) bool {
// 连接归还后调用(异步执行)
// 返回 true: 归还到池;false: 销毁
// 用例:清理 session 状态、重置 search_path
return true
}
config.BeforeClose = func(conn *pgx.Conn) {
// 连接被销毁前调用
// 用例:清理资源、记录日志
}
3.5 Pool.Stat() 监控
type Stat struct {
AcquireCount() int64 // 累计成功获取连接次数
AcquireDuration() time.Duration // 累计获取连接等待时间
AcquiredConns() int32 // 当前被持有的连接数
CanceledAcquireCount() int64 // 因 context 取消的获取次数
ConstructingConns() int32 // 正在创建中的连接数
EmptyAcquireCount() int64 // 池为空时的获取次数
EmptyAcquireWaitTime() time.Duration // 池为空时的等待总时长
IdleConns() int32 // 当前空闲连接数
MaxConns() int32 // 最大连接数
TotalConns() int32 // 总连接数
NewConnsCount() int64 // 累计新建连接数
MaxLifetimeDestroyCount() int64 // 因超过 MaxConnLifetime 销毁的连接数
MaxIdleDestroyCount() int64 // 因超过 MaxConnIdleTime 销毁的连接数
}
应该暴露为 metrics 的关键指标:
AcquiredConns / MaxConns— 池利用率,接近 100% 说明可能池耗尽EmptyAcquireCount— 池为空的次数,持续增长说明 MaxConns 不够EmptyAcquireWaitTime / EmptyAcquireCount— 平均等待时间CanceledAcquireCount— 等不到连接而超时的次数,这是 P99 延迟的元凶
4. 查询执行模式(QueryExecMode)
pgx v5 引入了 5 种查询执行模式,统一控制 prepared statement 行为:
type QueryExecMode int32
const (
QueryExecModeCacheStatement // 默认。自动 Prepare + 缓存。LRU 缓存,容量 512
QueryExecModeCacheDescribe // 只缓存描述(参数/结果类型),不在服务端创建 prepared statement
QueryExecModeDescribeExec // 每次都 Describe,不缓存。安全但每次 2 RTT
QueryExecModeExec // 不 Describe,客户端推断参数类型。1 RTT,text 格式
QueryExecModeSimpleProtocol // Simple Query Protocol。参数客户端插值。1 RTT
)
各模式对比
| 模式 | RTT(首次/后续) | Prepared Statement | 二进制格式 | PgBouncer 兼容 | 适用场景 |
|---|---|---|---|---|---|
CacheStatement | 2/1 | 服务端创建+缓存 | 是 | 仅 session 模式 | 默认,直连 PG |
CacheDescribe | 2/1 | 不创建,只缓存描述 | 是 | transaction 模式可用 | PgBouncer transaction 模式 |
DescribeExec | 2/2 | 不创建,不缓存 | 是 | 是 | Schema 频繁变更 |
Exec | 1/1 | 不使用 | 否(text) | 是 | 极简场景 |
SimpleProtocol | 1/1 | 不使用 | 否(text) | 是 | 非 PG 代理/最大兼容 |
设置方式:
// 全局设置(连接字符串)
connStr := "postgres://...?default_query_exec_mode=cache_describe"
// 全局设置(代码)
config.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe
// 单次查询覆盖(作为第一个参数传入)
pool.QueryRow(ctx, pgx.QueryExecModeSimpleProtocol, "SELECT $1", 42)
Statement Cache 内部机制
CacheStatement 模式下:
- 首次执行 SQL → 计算 SQL 的 SHA-256,生成
stmtcache_<hash>作为 prepared statement 名 - 发送
Parse(创建 prepared statement)+Describe(获取参数/结果类型)到服务端 - 缓存
StatementDescription(含参数 OID、结果字段描述)到 LRU cache(默认容量 512) - 后续执行同一 SQL → 命中缓存 → 直接
Bind+Execute,只需 1 RTT
CacheDescribe 模式下:
- 首次执行 SQL → 发送 unnamed prepared statement 的
Parse+Describe - 只缓存
StatementDescription,不在服务端保留 prepared statement - 后续执行 → 使用缓存的描述做
ExecParams(extended protocol,但不引用 named statement)
为什么 PgBouncer transaction 模式与 CacheStatement 冲突:PgBouncer transaction 模式在每次事务结束后可能把连接换给其他客户端。而 prepared statement 是绑定到特定后端连接的——客户端认为 statement 已经 prepare 了,但实际后端连接可能已经换了一个没有这个 statement 的连接,导致 prepared statement does not exist 错误。
5. 查询执行模式
5.1 Query + CollectRows(推荐模式)
// pgx/v5 推荐:Query + CollectRows 泛型函数
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
}
// RowToStructByName — 按列名匹配 struct 字段(case-insensitive)
users, err := pgx.CollectRows(
pool.Query(ctx, "SELECT id, name, email FROM users WHERE active = $1", true),
pgx.RowToStructByName[User],
)
// users 是 []User,rows 已自动关闭
// RowTo — 扫描单列到基础类型
ids, err := pgx.CollectRows(
pool.Query(ctx, "SELECT id FROM users WHERE active = $1", true),
pgx.RowTo[int64],
)
// ids 是 []int64
// CollectExactlyOneRow — 期望恰好一行,多行返回 ErrTooManyRows
user, err := pgx.CollectExactlyOneRow(
pool.Query(ctx, "SELECT id, name, email FROM users WHERE id = $1", userID),
pgx.RowToStructByName[User],
)
// RowToMap — 扫描为 map[string]any
m, err := pgx.CollectRows(
pool.Query(ctx, "SELECT * FROM users LIMIT 10"),
pgx.RowToMap,
)
RowToStructByName 的匹配规则:
- case-insensitive 匹配
- 没有
dbtag 时,Go 字段名和 DB 列名的下划线会被忽略(UserName匹配user_name) - 有
db:"col_name"tag 时,精确匹配 tag 值 db:"-"忽略该字段- Strict 模式:struct 的每个公开字段都必须有对应列,否则报错
- Lax 模式(
RowToStructByNameLax):允许 struct 有多余字段
5.2 QueryRow + Scan
var name string
var age int32
err := pool.QueryRow(ctx,
"SELECT name, age FROM users WHERE id = $1", userID,
).Scan(&name, &age)
if errors.Is(err, pgx.ErrNoRows) {
// 没找到行
}
5.3 Exec(写操作)
tag, err := pool.Exec(ctx,
"UPDATE users SET email = $1 WHERE id = $2",
"new@example.com", userID,
)
fmt.Println(tag.RowsAffected()) // 1
fmt.Println(tag.String()) // "UPDATE 1"
5.4 NamedArgs
// 用 @name 替代 $1, $2...,由 pgx 自动重写为 $N 占位符
rows, err := pool.Query(ctx,
"SELECT * FROM users WHERE name = @name AND age > @minAge",
pgx.NamedArgs{"name": "Alice", "minAge": 18},
)
// StrictNamedArgs — SQL 中的每个 @param 都必须在 map 中存在,且 map 中没有多余 key
rows, err := pool.Query(ctx,
"SELECT * FROM users WHERE name = @name",
pgx.StrictNamedArgs{"name": "Alice"},
)
5.5 ForEachRow(回调式遍历)
var total int64
var name string
var amount int64
rows, _ := pool.Query(ctx, "SELECT name, amount FROM transactions")
tag, err := pgx.ForEachRow(rows, []any{&name, &amount}, func() error {
total += amount
fmt.Printf("%s: %d\n", name, amount)
return nil
})
6. Batch 批量操作
Batch 把多个查询打包成一次网络往返发送给服务端,显著减少 RTT 延迟。
6.1 基本用法
batch := &pgx.Batch{}
// 队列多个查询
batch.Queue("INSERT INTO events (name) VALUES ($1)", "event1")
batch.Queue("INSERT INTO events (name) VALUES ($1)", "event2")
batch.Queue("SELECT count(*) FROM events")
// 一次发送
br := pool.SendBatch(ctx, batch)
defer br.Close() // 必须关闭!
// 按入队顺序读取结果
tag1, err := br.Exec() // INSERT 1
tag2, err := br.Exec() // INSERT 2
var count int64
err = br.QueryRow().Scan(&count) // SELECT
6.2 回调模式(推荐)
batch := &pgx.Batch{}
var totalInserted int64
var names []string
// Exec 回调
batch.Queue("INSERT INTO events (name) VALUES ($1)", "e1").
Exec(func(ct pgconn.CommandTag) error {
totalInserted += ct.RowsAffected()
return nil
})
// Query 回调
batch.Queue("SELECT name FROM events ORDER BY id").
Query(func(rows pgx.Rows) error {
var err error
names, err = pgx.CollectRows(rows, pgx.RowTo[string])
return err
})
// QueryRow 回调
var count int64
batch.Queue("SELECT count(*) FROM events").
QueryRow(func(row pgx.Row) error {
return row.Scan(&count)
})
br := pool.SendBatch(ctx, batch)
err := br.Close() // 触发所有回调
6.3 Batch 内部实现
在 CacheStatement/CacheDescribe 模式下,Batch 使用 pipeline 模式(pgconn.Pipeline):所有查询的 Parse/Bind/Execute 消息连续发送,最后一次性 Sync,服务端按顺序返回所有结果。这比逐条查询少了 N-1 次 RTT。
错误处理:Batch 中某条查询出错后,后续查询的结果不可用。br.Close() 会返回第一个遇到的错误。回调模式下,出错后后续回调不会被执行。
7. COPY 协议
PostgreSQL COPY 是批量数据导入最快的方式。pgx 用 CopyFrom 方法封装了 COPY 的 binary 格式。
7.1 基本用法
// 从内存数据批量插入
rows := [][]any{
{"Alice", "alice@example.com", int32(30)},
{"Bob", "bob@example.com", int32(25)},
}
count, err := pool.CopyFrom(
ctx,
pgx.Identifier{"users"}, // 表名(自动引号转义)
[]string{"name", "email", "age"}, // 列名
pgx.CopyFromRows(rows), // 数据源
)
fmt.Printf("copied %d rows\n", count)
7.2 三种数据源
// 1. CopyFromRows — 从 [][]any 切片
pgx.CopyFromRows([][]any{{"a", 1}, {"b", 2}})
// 2. CopyFromSlice — 从已有 typed slice,通过函数转换
type User struct { Name string; Age int32 }
users := []User{{"Alice", 30}, {"Bob", 25}}
pgx.CopyFromSlice(len(users), func(i int) ([]any, error) {
return []any{users[i].Name, users[i].Age}, nil
})
// 3. CopyFromFunc — 流式生成,适合大数据集(不需要全部加载到内存)
pgx.CopyFromFunc(func() ([]any, error) {
row, err := csvReader.Read()
if err == io.EOF {
return nil, nil // 返回 (nil, nil) 表示结束
}
if err != nil {
return nil, err // 返回 error 中止 COPY
}
return []any{row[0], row[1]}, nil
})
7.3 性能对比
| 方法 | 1000 行 | 10000 行 | 100000 行 |
|---|---|---|---|
| 逐条 INSERT | ~500ms | ~5s | ~50s |
| Batch INSERT | ~50ms | ~500ms | ~5s |
| COPY Protocol | ~10ms | ~50ms | ~200ms |
COPY 在 5 行以上就比单条 INSERT 快(来自 pgx 官方文档)。数据量越大优势越明显,因为 COPY 使用二进制格式且是流式传输。
注意:COPY 要求所有列类型都支持二进制编码。自定义 enum 类型必须先用 conn.LoadType + RegisterType 注册,否则会报错。
8. LISTEN/NOTIFY
PostgreSQL 内置的 pub/sub 机制,适合轻量级事件通知。
// LISTEN/NOTIFY 需要一个专用连接——不能用 pool 的短连接
conn, err := pgx.Connect(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
defer conn.Close(ctx)
// 订阅频道
_, err = conn.Exec(ctx, "LISTEN order_events")
if err != nil {
log.Fatal(err)
}
// 阻塞等待通知
for {
notification, err := conn.WaitForNotification(ctx)
if err != nil {
log.Printf("wait error: %v", err)
break
}
fmt.Printf("channel: %s, payload: %s\n",
notification.Channel, notification.Payload)
}
// 在另一个连接/事务中发送通知
pool.Exec(ctx, "NOTIFY order_events, $1",
`{"order_id": 123, "status": "paid"}`)
// 或在 SQL 中:SELECT pg_notify('order_events', '...')
与 pgxpool 集成的注意事项:
WaitForNotification会阻塞连接直到收到通知或 context 取消- 不能在 pool 连接上做 LISTEN — pool 的连接会被释放回池,此时另一个 goroutine 可能拿到它,LISTEN 状态就丢失了
- 正确做法:
pool.Acquire()获取一个连接后Hijack()拿走底层*pgx.Conn,自己管理生命周期 - 或者直接用
pgx.Connect创建独立连接
// 从 pool 获取专用连接的正确方式
c, err := pool.Acquire(ctx)
if err != nil {
log.Fatal(err)
}
conn := c.Hijack() // 从池中取走,不会被回收
defer conn.Close(ctx)
// 现在可以安全地 LISTEN
_, err = conn.Exec(ctx, "LISTEN my_channel")
// ...
9. 事务处理
9.1 基本事务
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx) // Commit 成功后 Rollback 是安全的 no-op
_, err = tx.Exec(ctx, "INSERT INTO orders (user_id) VALUES ($1)", userID)
if err != nil {
return err
}
err = tx.Commit(ctx)
return err
9.2 BeginTxFunc(推荐模式)
// 自动 commit/rollback,避免忘记 defer Rollback
err := pgx.BeginTxFunc(ctx, pool, pgx.TxOptions{
IsoLevel: pgx.Serializable,
AccessMode: pgx.ReadWrite,
}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
if err != nil {
return err // 自动 Rollback
}
_, err = tx.Exec(ctx, "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
return err // nil → 自动 Commit; error → 自动 Rollback
})
9.3 TxOptions
type TxOptions struct {
IsoLevel TxIsoLevel // Serializable, RepeatableRead, ReadCommitted, ReadUncommitted
AccessMode TxAccessMode // ReadWrite, ReadOnly
DeferrableMode TxDeferrableMode // Deferrable, NotDeferrable
BeginQuery string // 自定义 BEGIN SQL(如 CockroachDB 的 "BEGIN PRIORITY HIGH")
CommitQuery string // 自定义 COMMIT SQL
}
9.4 Savepoints(伪嵌套事务)
tx, err := pool.Begin(ctx)
defer tx.Rollback(ctx)
// tx.Begin() 创建 savepoint(不是真的嵌套事务)
sp, err := tx.Begin(ctx)
if err != nil {
return err
}
_, err = sp.Exec(ctx, "INSERT INTO audit_log ...")
if err != nil {
sp.Rollback(ctx) // 只回滚到 savepoint,外层事务不受影响
} else {
sp.Commit(ctx) // 释放 savepoint
}
// 外层事务继续
_, err = tx.Exec(ctx, "UPDATE ...")
return tx.Commit(ctx)
内部实现:tx.Begin() 执行 SAVEPOINT sp_N,sp.Commit() 执行 RELEASE SAVEPOINT sp_N,sp.Rollback() 执行 ROLLBACK TO SAVEPOINT sp_N。
事务与连接池:pool.Begin() 会 Acquire 一个连接,这个连接在 tx.Commit() 或 tx.Rollback() 之前不会归还。长事务 = 长期占用连接 = 池可用连接减少。
10. 类型系统(pgtype)
10.1 架构
// Map 是类型注册表——关联 PostgreSQL OID 与 Go Codec
type Map struct { /* ... */ }
// Type 表示一个已注册的 PostgreSQL 类型
type Type struct {
Name string // 类型名称
OID uint32 // PostgreSQL OID
Codec Codec // 编解码器
}
// Codec 接口——负责一种 PostgreSQL 类型的编解码
type Codec interface {
FormatSupported(format int16) bool
PreferredFormat() int16
PlanEncode(m *Map, oid uint32, format int16, value any) EncodePlan
PlanScan(m *Map, oid uint32, format int16, target any) ScanPlan
DecodeDatabaseSQLValue(m *Map, oid uint32, format int16, src []byte) (driver.Value, error)
DecodeValue(m *Map, oid uint32, format int16, src []byte) (any, error)
}
每个 pgx.Conn 创建时会初始化一个 pgtype.NewMap(),默认包含 70+ 内置类型。
10.2 内置类型映射
| PostgreSQL 类型 | pgtype 类型 | Go 原生类型 |
|---|---|---|
int2/int4/int8 | pgtype.Int2/Int4/Int8 | int16/int32/int64 |
float4/float8 | pgtype.Float4/Float8 | float32/float64 |
text/varchar | pgtype.Text | string |
bool | pgtype.Bool | bool |
bytea | pgtype.Bytea | []byte |
uuid | pgtype.UUID | [16]byte |
timestamptz | pgtype.Timestamptz | time.Time |
timestamp | pgtype.Timestamp | time.Time |
date | pgtype.Date | time.Time |
interval | pgtype.Interval | — |
json/jsonb | pgtype.JSONB | []byte / 任意 struct |
inet/cidr | — | netip.Addr / netip.Prefix |
int4[] | pgtype.Array[pgtype.Int4] | []int32 |
text[] | pgtype.Array[pgtype.Text] | []string |
numeric | pgtype.Numeric | — |
hstore | pgtype.Hstore | map[string]*string |
10.3 Null 处理
pgx v5 提供三种 null 处理方式:
// 方式 1: pgtype.T 的 Valid 字段(推荐,零值安全)
var name pgtype.Text
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
if name.Valid {
fmt.Println(name.String)
} else {
fmt.Println("NULL")
}
// 方式 2: Go 指针(简洁,但需要解引用检查)
var name *string
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
if name != nil {
fmt.Println(*name)
}
// 方式 3: database/sql 的 NullString(兼容旧代码)
var name sql.NullString
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
选择建议:在纯 pgx 项目中用 *string / *int64(最简洁)。需要区分「零值」和「NULL」时用 pgtype.Text。sql.NullString 只在 database/sql 兼容场景使用。
10.4 自定义类型注册
// 在 AfterConnect 中注册自定义 enum
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
// LoadType 从数据库查询类型的 OID 和定义
dt, err := conn.LoadType(ctx, "order_status")
if err != nil {
return fmt.Errorf("load type order_status: %w", err)
}
conn.TypeMap().RegisterType(dt)
// 注册数组类型
dtArray, err := conn.LoadType(ctx, "_order_status") // 数组类型名前缀 _
if err != nil {
return fmt.Errorf("load type _order_status: %w", err)
}
conn.TypeMap().RegisterType(dtArray)
return nil
}
11. Tracing 与可观测性
11.1 Tracer 接口
pgx 通过 interface 定义了 5 种 tracer,设置在 ConnConfig.Tracer 上。一个 tracer 可以实现多个接口。
// 查询追踪(Query/QueryRow/Exec)
type QueryTracer interface {
TraceQueryStart(ctx context.Context, conn *Conn, data TraceQueryStartData) context.Context
TraceQueryEnd(ctx context.Context, conn *Conn, data TraceQueryEndData)
}
// Batch 追踪
type BatchTracer interface {
TraceBatchStart(ctx context.Context, conn *Conn, data TraceBatchStartData) context.Context
TraceBatchQuery(ctx context.Context, conn *Conn, data TraceBatchQueryData)
TraceBatchEnd(ctx context.Context, conn *Conn, data TraceBatchEndData)
}
// COPY 追踪
type CopyFromTracer interface {
TraceCopyFromStart(ctx context.Context, conn *Conn, data TraceCopyFromStartData) context.Context
TraceCopyFromEnd(ctx context.Context, conn *Conn, data TraceCopyFromEndData)
}
// Prepare 追踪
type PrepareTracer interface {
TracePrepareStart(ctx context.Context, conn *Conn, data TracePrepareStartData) context.Context
TracePrepareEnd(ctx context.Context, conn *Conn, data TracePrepareEndData)
}
// 连接追踪
type ConnectTracer interface {
TraceConnectStart(ctx context.Context, data TraceConnectStartData) context.Context
TraceConnectEnd(ctx context.Context, data TraceConnectEndData)
}
pgxpool 额外定义了两个:
type AcquireTracer interface {
TraceAcquireStart(ctx context.Context, pool *Pool, data TraceAcquireStartData) context.Context
TraceAcquireEnd(ctx context.Context, pool *Pool, data TraceAcquireEndData)
}
type ReleaseTracer interface {
TraceRelease(pool *Pool, data TraceReleaseData)
}
11.2 OpenTelemetry 集成
使用 github.com/exaring/otelpgx:
import "github.com/exaring/otelpgx"
config, _ := pgxpool.ParseConfig(connStr)
config.ConnConfig.Tracer = otelpgx.NewTracer()
pool, _ := pgxpool.NewWithConfig(ctx, config)
11.3 日志集成
使用 pgx/tracelog + 日志适配器(支持 slog、zap、zerolog 等):
import (
"github.com/jackc/pgx/v5/tracelog"
"github.com/mcosta74/pgx-slog"
)
config.ConnConfig.Tracer = &tracelog.TraceLog{
Logger: pgxslog.NewLogger(slog.Default()),
LogLevel: tracelog.LogLevelInfo,
}
11.4 组合多个 Tracer
import "github.com/jackc/pgx/v5/multitracer"
config.ConnConfig.Tracer = &multitracer.Tracer{
Tracers: []pgx.QueryTracer{
otelpgx.NewTracer(),
&tracelog.TraceLog{Logger: myLogger, LogLevel: tracelog.LogLevelDebug},
},
}
12. pgx vs database/sql 对比
原生 pgx API 的优势
| 能力 | pgx 原生 | database/sql + pgx/stdlib |
|---|---|---|
| COPY 协议 | pool.CopyFrom() | 不支持(需 Raw() 降级) |
| Batch 批量 | pool.SendBatch() | 不支持 |
| LISTEN/NOTIFY | conn.WaitForNotification() | 不支持 |
| 二进制格式 | 默认启用 | 部分类型 |
| 自定义类型(enum/composite) | RegisterType | 需要 SQLScanner 适配 |
| 泛型扫描 | CollectRows[T] | 不支持 |
| NamedArgs | pgx.NamedArgs | 不支持 |
| 连接池统计 | pool.Stat() 详细指标 | db.Stats() 基础指标 |
| 性能 | 更高(减少反射、支持二进制) | 额外一层适配开销 |
| 查询模式控制 | QueryExecMode | 有限 |
database/sql 的优势
- 驱动可替换 — 理论上可以换成 MySQL/SQLite(实际项目很少这样做)
- 生态兼容 — 某些库只支持
*sql.DB(如一些 migration 工具) - 团队熟悉度 —
database/sql是 Go 标准库
stdlib 混合用法
// 从 database/sql 连接获取底层 pgx.Conn
db, _ := sql.Open("pgx", connStr)
conn, _ := db.Conn(ctx)
err := conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn() // *pgx.Conn
// 使用 pgx 原生功能
_, err := pgxConn.CopyFrom(ctx, pgx.Identifier{"users"}, cols, src)
return err
})
// 或从 pgxpool 创建 database/sql 连接
pool, _ := pgxpool.New(ctx, connStr)
db := stdlib.OpenDBFromPool(pool)
13. 完整代码示例
13.1 pgxpool 初始化与 CRUD
// pgx/v5
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx := context.Background()
config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
config.MaxConns = 20
config.MinConns = 5
config.MaxConnLifetime = 30 * time.Minute
config.MaxConnLifetimeJitter = 3 * time.Minute
config.MaxConnIdleTime = 10 * time.Minute
config.HealthCheckPeriod = 30 * time.Second
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
log.Fatal(err)
}
defer pool.Close()
// Ping 验证连接
if err := pool.Ping(ctx); err != nil {
log.Fatal("cannot reach database:", err)
}
// CREATE
var id int64
err = pool.QueryRow(ctx,
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
"Alice", "alice@example.com",
).Scan(&id)
if err != nil {
log.Fatal(err)
}
// READ — 单行
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
}
user, err := pgx.CollectExactlyOneRow(
pool.Query(ctx, "SELECT id, name, email FROM users WHERE id = $1", id),
pgx.RowToStructByName[User],
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("User: %+v\n", user)
// READ — 多行
users, err := pgx.CollectRows(
pool.Query(ctx, "SELECT id, name, email FROM users ORDER BY id LIMIT 100"),
pgx.RowToStructByName[User],
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Found %d users\n", len(users))
// UPDATE
tag, err := pool.Exec(ctx,
"UPDATE users SET email = $1 WHERE id = $2",
"alice-new@example.com", id,
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Updated %d rows\n", tag.RowsAffected())
// DELETE
tag, err = pool.Exec(ctx, "DELETE FROM users WHERE id = $1", id)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Deleted %d rows\n", tag.RowsAffected())
}
13.2 Batch 操作
func batchExample(ctx context.Context, pool *pgxpool.Pool) error {
batch := &pgx.Batch{}
var counts [3]int64
for i := range 3 {
name := fmt.Sprintf("user_%d", i)
batch.Queue(
"INSERT INTO users (name, email) VALUES ($1, $2)",
name, name+"@example.com",
).Exec(func(ct pgconn.CommandTag) error {
counts[i] = ct.RowsAffected()
return nil
})
}
var total int64
batch.Queue("SELECT count(*) FROM users").QueryRow(func(row pgx.Row) error {
return row.Scan(&total)
})
br := pool.SendBatch(ctx, batch)
if err := br.Close(); err != nil {
return fmt.Errorf("batch failed: %w", err)
}
fmt.Printf("Inserted: %v, Total: %d\n", counts, total)
return nil
}
13.3 COPY 批量插入
func copyExample(ctx context.Context, pool *pgxpool.Pool) error {
// 生成测试数据
const n = 10000
rows := make([][]any, n)
for i := range n {
rows[i] = []any{
fmt.Sprintf("user_%d", i),
fmt.Sprintf("user_%d@example.com", i),
int32(20 + i%50),
}
}
count, err := pool.CopyFrom(
ctx,
pgx.Identifier{"users"},
[]string{"name", "email", "age"},
pgx.CopyFromRows(rows),
)
if err != nil {
return fmt.Errorf("copy failed: %w", err)
}
fmt.Printf("Copied %d rows\n", count)
return nil
}
13.4 事务与 Savepoints
func transferWithAudit(ctx context.Context, pool *pgxpool.Pool,
fromID, toID int64, amount int64) error {
return pgx.BeginTxFunc(ctx, pool, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
}, func(tx pgx.Tx) error {
// 主事务:转账
_, err := tx.Exec(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
if err != nil {
return err
}
_, err = tx.Exec(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
if err != nil {
return err
}
// Savepoint:审计日志(失败不影响转账)
sp, err := tx.Begin(ctx)
if err != nil {
return err
}
_, err = sp.Exec(ctx,
"INSERT INTO audit_log (action, details) VALUES ($1, $2)",
"transfer", fmt.Sprintf("%d->%d: %d", fromID, toID, amount))
if err != nil {
sp.Rollback(ctx) // 只回滚审计日志
} else {
sp.Commit(ctx)
}
return nil // 外层事务提交
})
}
13.5 LISTEN/NOTIFY
func listenExample(ctx context.Context, pool *pgxpool.Pool) error {
// 获取专用连接
c, err := pool.Acquire(ctx)
if err != nil {
return err
}
conn := c.Hijack() // 从池中取走
defer conn.Close(context.Background())
_, err = conn.Exec(ctx, "LISTEN order_events")
if err != nil {
return err
}
log.Println("Listening for order_events...")
for {
notification, err := conn.WaitForNotification(ctx)
if err != nil {
return fmt.Errorf("wait: %w", err)
}
log.Printf("Received on [%s]: %s\n",
notification.Channel, notification.Payload)
}
}
// 发送通知(在任何连接上)
func notifyExample(ctx context.Context, pool *pgxpool.Pool) error {
_, err := pool.Exec(ctx,
"SELECT pg_notify($1, $2)",
"order_events",
`{"order_id": 42, "status": "shipped"}`,
)
return err
}
13.6 RowToStructByName 泛型扫描
type Product struct {
ID int64 `db:"id"`
Name string `db:"name"`
Price int64 `db:"price_cents"`
Description *string `db:"description"` // nullable
CreatedAt time.Time `db:"created_at"`
Tags []string `db:"tags"` // PostgreSQL text[]
}
func getProducts(ctx context.Context, pool *pgxpool.Pool, minPrice int64) ([]Product, error) {
return pgx.CollectRows(
pool.Query(ctx,
`SELECT id, name, price_cents, description, created_at, tags
FROM products
WHERE price_cents >= $1
ORDER BY created_at DESC`, minPrice),
pgx.RowToStructByName[Product],
)
}
// Lax 模式:允许 struct 字段比查询列多(部分查询场景)
type ProductSummary struct {
ID int64 `db:"id"`
Name string `db:"name"`
// Description 等字段不在查询中,Lax 模式不会报错
Description *string `db:"description"`
}
func getProductNames(ctx context.Context, pool *pgxpool.Pool) ([]ProductSummary, error) {
return pgx.CollectRows(
pool.Query(ctx, "SELECT id, name FROM products"),
pgx.RowToStructByNameLax[ProductSummary],
)
}
14. Go PostgreSQL 驱动/工具对比
| pgx 原生 | database/sql + pgx/stdlib | lib/pq | GORM | sqlc | |
|---|---|---|---|---|---|
| 定位 | 底层驱动 | 标准接口适配 | 旧标准驱动 | ORM | 代码生成器 |
| 底层 | 直接 wire protocol | pgx 适配 | 独立实现 | pgx via gorm.io/driver/postgres | 生成 pgx 代码 |
| COPY | 原生支持 | Raw() 降级 | 不支持 | 不支持 | 不支持 |
| Batch | 原生支持 | 不支持 | 不支持 | 不支持 | 可用 |
| LISTEN/NOTIFY | 原生支持 | 不支持 | 有限支持 | 不支持 | 不支持 |
| 类型安全 | 运行时 | 运行时 | 运行时 | 运行时(反射) | 编译时 |
| 性能 | 最高 | 稍低 | 低 | 较低(反射开销) | 接近 pgx 原生 |
| 学习成本 | 中 | 低 | 低 | 高(ORM 概念) | 中(写 SQL + 配置) |
| 适用场景 | 性能敏感、PG 深度功能 | 需要生态兼容 | 遗留项目 | 快速 CRUD | SQL-first 项目 |
| 维护状态 | 活跃 | 活跃 | 维护模式 | 活跃 | 活跃 |
推荐组合:
- 性能敏感 / PG 深度使用 → pgx 原生 API
- 业务 CRUD 为主 → sqlc + pgx(编译时类型安全 + pgx 性能)
- 快速原型 → GORM(底层也是 pgx)
- 遗留项目迁移 → pgx/stdlib 替换 lib/pq(改一行 import 即可)
15. 陷阱(Pitfalls)
1. 连接池耗尽:长事务和 LISTEN 是元凶
pool.Begin() 持有一个连接直到 Commit/Rollback。如果事务中有慢查询、HTTP 调用、或忘记 Commit,连接不会归还。当 AcquiredConns == MaxConns 时,所有新请求都在等待。
// 错误:在事务中调用外部 API
tx, _ := pool.Begin(ctx)
resp, _ := http.Get("https://slow-api.com/validate") // 可能 hang 30s
tx.Exec(ctx, "INSERT ...")
tx.Commit(ctx)
// 正确:先完成外部调用,再开事务
resp, _ := http.Get("https://slow-api.com/validate")
tx, _ := pool.Begin(ctx)
tx.Exec(ctx, "INSERT ...")
tx.Commit(ctx)
LISTEN 更危险——WaitForNotification 会无限期持有连接。必须用 Hijack() 或独立 pgx.Connect。
2. PgBouncer transaction 模式与 prepared statement 冲突
// 错误:使用默认 CacheStatement 模式连接 PgBouncer transaction pool
pool, _ := pgxpool.New(ctx, "postgres://pgbouncer:6432/db")
// → "prepared statement stmtcache_xxx does not exist"
// 正确:切换到 CacheDescribe 或更低模式
config, _ := pgxpool.ParseConfig("postgres://pgbouncer:6432/db")
config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe
// 或在连接字符串中:?default_query_exec_mode=cache_describe
3. 不使用 CollectRows 导致资源泄漏
// 错误:Query 返回的 Rows 没有关闭
rows, err := pool.Query(ctx, "SELECT * FROM users")
if err != nil {
return err
}
// 忘记 rows.Close() → 连接不会归还到池!
// 正确:用 CollectRows(自动关闭)
users, err := pgx.CollectRows(
pool.Query(ctx, "SELECT * FROM users"),
pgx.RowToStructByName[User],
)
// 或手动 defer rows.Close()
4. Context 取消与查询取消的行为
context.Cancel() 会向 PostgreSQL 发送 cancel 请求(通过 BackendKeyData 中的 PID 和 secret key)。但 cancel 是异步的——PostgreSQL 可能在取消请求到达前已经完成了查询。
对于写操作:context cancel 可能导致部分写入(PostgreSQL 已执行但 Go 侧认为超时)。关键写操作应该用独立 context 或不设超时。
5. pgtype null 处理混淆
// 错误:用 string 接收可能为 NULL 的列
var name string
pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
// name 为 NULL 时 → Scan 报错 "cannot scan NULL into *string"
// 正确:用 *string 或 pgtype.Text
var name *string
pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
6. COPY 的类型注册要求
// 错误:COPY 自定义 enum 列但未注册类型
pool.CopyFrom(ctx, pgx.Identifier{"orders"}, []string{"status"}, rows)
// → "unable to encode ... for OID ..." 因为 COPY 强制使用 binary 格式
// 正确:在 AfterConnect 中注册
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
dt, _ := conn.LoadType(ctx, "order_status")
conn.TypeMap().RegisterType(dt)
return nil
}
7. 连接字符串 sslmode 默认行为
pgx 默认 sslmode=prefer(尝试 TLS,失败则明文)。生产环境应该明确设置 sslmode=require 或 sslmode=verify-full。
# 生产环境
postgres://user:pass@host:5432/db?sslmode=verify-full&sslrootcert=/path/to/ca.pem
8. RowToStructByName 的字段匹配陷阱
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Age int32 // 没有 db tag
}
// 查询 SELECT id, name, user_age FROM users
// → "struct doesn't have corresponding row field user_age"
// 因为没有 db tag 的 Age 字段匹配 "Age"(忽略下划线后是 "age"),不匹配 "user_age"
// 修复:加上 db tag
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Age int32 `db:"user_age"`
}
9. Scan 到错误类型不一定报错
pgx 使用 text 格式时,某些类型转换是静默的。例如数据库返回 int8(bigint),但你 Scan 到 int32,如果值在 int32 范围内不会报错,超范围才报错。binary 格式下类型匹配更严格。
10. Pool.Reset() 不等于重新创建池
pool.Reset() 会关闭所有空闲连接,但被持有的连接只有在归还时才会被关闭。在网络中断后调用 Reset() 可以快速恢复,但不要指望它立即终止正在使用的连接。
16. 生产 Checklist
连接池配置
-
MaxConns按实际 PGmax_connections和实例数计算,不要用默认值 - 设置
MinConns或MinIdleConns避免冷启动 - 设置
MaxConnLifetimeJitter避免连接同时过期
PgBouncer 兼容
- 使用
QueryExecModeCacheDescribe或QueryExecModeExec - 确认 PgBouncer 的
pool_mode = transaction - 不在事务外使用
SET语句(PgBouncer 会把连接换走)
可观测性
- 设置
Tracer(至少 QueryTracer),集成 OpenTelemetry 或结构化日志 - 定期采集
pool.Stat()暴露为 Prometheus metrics - 关注
EmptyAcquireCount和CanceledAcquireCount
类型安全
- 自定义 enum/composite/domain 在
AfterConnect中注册 - nullable 列使用
*T或pgtype.T接收 - 使用
RowToStructByName而非手动 Scan(减少列顺序耦合)
优雅关闭
// 优雅关闭:等待所有连接归还后关闭
// pool.Close() 会阻塞直到所有 Acquired 连接 Release
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 先停止接受新请求,然后:
pool.Close() // 阻塞等待
错误处理
// 判断 PostgreSQL 错误码
import "github.com/jackc/pgerrcode"
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case pgerrcode.UniqueViolation:
// 唯一约束冲突
case pgerrcode.SerializationFailure:
// 序列化冲突,应该重试
case pgerrcode.DeadlockDetected:
// 死锁,应该重试
}
}
测试
- 使用
github.com/pashagolub/pgxmock做单元测试(mock pgx 接口) - 使用 testcontainers-go 做集成测试(真实 PostgreSQL 容器)
- 不要在测试中硬编码连接字符串
延伸方向
- PostgreSQL wire protocol 详细规范:Frontend/Backend Protocol
- pgx 架构演讲:PGX Top to Bottom(Golang Estonia)
- pgx GitHub:github.com/jackc/pgx
- puddle(pgxpool 底层资源池):github.com/jackc/puddle
- 逻辑复制客户端:github.com/jackc/pglogrepl