跳转到正文
zeno's blog
返回

Go PostgreSQL:pgx 的架构设计与生产实践

Table of contents

Open Table of contents

TL;DR

github.com/jackc/pgx/v5 是纯 Go 实现的 PostgreSQL 驱动和工具集。它直接实现 PostgreSQL 线协议(不依赖 libpq),提供原生 API 和 database/sql 兼容层双模式,内置连接池(pgxpool)、自动 prepared statement 缓存、COPY 批量导入、LISTEN/NOTIFY、70+ 类型映射。相比 lib/pq,pgx 更快、功能更全、维护更活跃,是 2026 年 Go + PostgreSQL 的唯一正解。


1. pgx 是什么,解决什么问题

历史背景

Go 标准库 database/sql 定义了通用数据库接口,但它是面向「最大公约数」设计的——所有数据库共享一套 API。这导致 PostgreSQL 的高级特性(COPY、LISTEN/NOTIFY、批量操作、二进制格式、自定义类型)全部无法使用。

早期 Go 社区的 PG 驱动是 lib/pq,它实现了 database/sql/driver 接口。但 lib/pq 在 2023 年进入维护模式(README 明确推荐 pgx),且有以下问题:

Jack Christensen 从 v1 开始构建 pgx,到 v5(2022 年发布)完成了重大重构:引入泛型(CollectRows[T]RowTo[T])、重新设计类型系统(pgtype)、统一查询执行模式(QueryExecMode)。

为什么 pgx 成为事实标准

  1. 纯 Go 实现 PostgreSQL wire protocol — 不依赖 CGO 或 libpq,交叉编译友好
  2. 双模式 — 原生 pgx API(高性能、全特性)+ pgx/stdlibdatabase/sql 兼容)
  3. 生态覆盖 — sqlc、GORM、ent 的 PostgreSQL backend 都基于 pgx
  4. 积极维护 — v5 持续迭代,支持 Go 1.25+ 和 PostgreSQL 14+
  5. 性能 — 二进制格式编解码、prepared statement 缓存、流式结果读取

双模式设计

┌─────────────────────────────────────────────┐
│  你的应用代码                                 │
├──────────────────┬──────────────────────────┤
│  pgx 原生 API     │  database/sql 标准接口     │
│  pool.Query()    │  db.Query()              │
│  pool.CopyFrom() │  (无法使用 COPY)           │
│  pool.SendBatch()│  (无法使用 Batch)          │
├──────────────────┼──────────────────────────┤
│  pgxpool          │  pgx/stdlib 适配层        │
├──────────────────┴──────────────────────────┤
│  pgx.Conn — 高层查询/事务/类型映射             │
├─────────────────────────────────────────────┤
│  pgconn.PgConn — 低层协议连接                 │
├─────────────────────────────────────────────┤
│  pgproto3 — wire protocol 编解码              │
├─────────────────────────────────────────────┤
│  TCP / TLS                                  │
└─────────────────────────────────────────────┘

选择建议:如果你的项目只用 PostgreSQL,用原生 pgx API。只有需要与 GORM 等 database/sql 生态库共存时,才用 pgx/stdlib。即使使用 database/sql,也可以通过 (*sql.Conn).Raw() 拿到底层 *pgx.Conn 使用原生特性。


2. 核心架构与分层设计

2.1 分层架构(自底向上)

职责核心类型
Wire Protocolpgproto3PostgreSQL v3 协议的消息编解码FrontendMessage, BackendMessage
低层连接pgconn认证、TLS、查询执行、COPY、通知(≈libpq)PgConn, Config
高层接口pgx(根包)连接管理、查询、事务、批量、类型映射Conn, Rows, Tx, Batch
类型系统pgtypeGo ↔ PostgreSQL 类型转换(70+ 类型)Map, Type, Codec
连接池pgxpool并发安全的连接池(基于 puddle/v2Pool, Config, Stat
database/sqlstdlibdatabase/sql/driver 适配层Driver, Conn

2.2 PostgreSQL Wire Protocol

PostgreSQL 使用 Frontend/Backend 协议(v3),基于 TCP 的消息交换。pgx 通过 pgproto3 包直接编解码这些消息。

连接启动流程

Client (Frontend)                    Server (Backend)
      │                                    │
      │── StartupMessage ────────────────→ │  (发送协议版本、用户名、数据库)
      │                                    │
      │← AuthenticationRequest ────────── │  (MD5/SCRAM-SHA-256/trust)
      │── PasswordMessage ───────────────→ │
      │                                    │
      │← AuthenticationOk ─────────────── │
      │← ParameterStatus (多条) ────────── │  (server_version, encoding, etc.)
      │← BackendKeyData ──────────────── │  (PID, 用于取消查询)
      │← ReadyForQuery ──────────────── │  (事务状态: I=idle, T=in tx, E=error)

两种查询协议

Simple Query ProtocolExtended Query Protocol
消息单条 Query 消息ParseBindDescribeExecute
参数内联到 SQL 字符串(客户端转义)独立传递,服务端绑定
Prepared Statement不支持支持(Parse 创建,后续复用)
二进制格式不支持支持(结果和参数都可以二进制)
SQL 注入风险取决于客户端转义实现天然安全(参数与 SQL 分离)
适用场景无参查询、PgBouncer 兼容默认模式、高性能场景

pgx 默认使用 Extended Query Protocol 并自动缓存 prepared statement。

2.3 pgconn.PgConn — 低层协议连接

pgconn.PgConn 是对 PostgreSQL 物理连接的 Go 封装,功能大致对应 C 的 libpq。它处理:

一般不直接使用,但可以通过 conn.PgConn() 获取以访问低层功能。

2.4 pgx.Conn — 高层连接

// Conn 不是并发安全的。生产代码中不要直接使用 Conn,用 pgxpool.Pool。
type Conn struct {
    pgConn             *pgconn.PgConn           // 底层协议连接
    config             *ConnConfig              // 连接配置
    preparedStatements map[string]*pgconn.StatementDescription  // 手动 Prepare 的语句
    statementCache     stmtcache.Cache          // 自动缓存(CacheStatement 模式)
    descriptionCache   stmtcache.Cache          // 描述缓存(CacheDescribe 模式)
    typeMap            *pgtype.Map              // 类型注册表
    queryTracer        QueryTracer              // 追踪器
    notifications      []*pgconn.Notification   // 缓冲的通知
    // ...
}

关键点:pgx.Conn 不是 goroutine 安全的。这是使用 pgxpool.Pool 的根本原因。


3. pgxpool 连接池深度解析

3.1 为什么需要连接池

pgxpool 基于 github.com/jackc/puddle/v2(一个通用资源池库)实现。

3.2 Pool 配置详解

config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
// ParseConfig 支持在连接字符串中指定池参数:
// postgres://user:pass@host:5432/db?pool_max_conns=20&pool_min_conns=5

// 也可以代码中修改:
config.MaxConns = 20                          // 最大连接数,默认 max(4, runtime.NumCPU())
config.MinConns = 5                           // 最小连接数,默认 0(纯懒创建)
config.MinIdleConns = 3                       // 最小空闲连接数(v5 新增,减少尾延迟)
config.MaxConnLifetime = 1 * time.Hour        // 连接最大存活时间,默认 1h
config.MaxConnLifetimeJitter = 5 * time.Minute // 存活时间抖动,防止同时过期
config.MaxConnIdleTime = 30 * time.Minute     // 空闲超时,默认 30min
config.HealthCheckPeriod = 1 * time.Minute    // 健康检查间隔,默认 1min
config.PingTimeout = 5 * time.Second          // Ping 超时,默认 0(无超时)

参数选择指南

参数生产建议理由
MaxConnsPG max_connections / 应用实例数,通常 10-30过多会压垮 PG;过少会池等待
MinConnsMaxConns / 4MaxConns / 2避免冷启动延迟
MinIdleConns2-5减少尾延迟,比 MinConns 更精准
MaxConnLifetime30min-1h回收长期连接,防止 PG 内存泄漏
MaxConnLifetimeJitterMaxConnLifetime * 10%避免雪崩式断连
MaxConnIdleTime5-15min空闲连接尽早释放给其他实例
HealthCheckPeriod30s-1min及时发现死连接

3.3 连接生命周期

                 ┌──────────────────────────────────────────┐
                 │              pgxpool.Pool                  │
                 │                                            │
  pool.Query()   │  Acquire() ──→ 有空闲连接? ──Y──→ ShouldPing?│
  ─────────────→ │       │                           │       │
                 │       N                          Ping     │
                 │       │                           │       │
                 │   连接数 < MaxConns?              成功?    │
                 │       │                           │       │
                 │       Y              N: Destroy,  Y       │
                 │       │              重试 Acquire   │       │
                 │   创建新连接                        │       │
                 │   BeforeConnect() ─→ pgx.Connect   │       │
                 │   ─→ AfterConnect()                │       │
                 │       │                            │       │
                 │       └──→ PrepareConn() ────────→ 返回连接  │
                 │                                    │       │
                 │       ← ← ← 执行查询 ← ← ← ← ← ← ┘       │
                 │                                            │
                 │  Release() ─→ conn 状态正常? ──Y──→ 过期?   │
                 │       │            │               │       │
                 │       N            N: Destroy      N       │
                 │       │                            │       │
                 │   Destroy()        AfterRelease() ──→ 归还池│
                 │   triggerHealthCheck()                      │
                 └──────────────────────────────────────────┘

关键设计决策

3.4 生命周期钩子

config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
    // 在建立连接前修改配置(每个连接独立副本)
    // 用例:动态密码轮换、添加连接属性
    cfg.RuntimeParams["application_name"] = "my-service-" + hostname
    return nil
}

config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
    // 连接建立后、加入池前。用于一次性初始化。
    // 用例:注册自定义类型、设置 search_path、加载扩展
    pgxUUID.Register(conn.TypeMap())

    // 注册自定义 enum
    dt, err := conn.LoadType(ctx, "my_enum_type")
    if err != nil {
        return err
    }
    conn.TypeMap().RegisterType(dt)
    return nil
}

config.PrepareConn = func(ctx context.Context, conn *pgx.Conn) (bool, error) {
    // 每次 Acquire 连接时调用(替代已废弃的 BeforeAcquire)
    // 返回 (true, nil): 连接有效,继续使用
    // 返回 (false, nil): 连接无效,销毁并重试新连接
    // 返回 (true, err): 连接有效但查询应该失败
    // 返回 (false, err): 连接无效且查询失败
    return true, nil
}

config.AfterRelease = func(conn *pgx.Conn) bool {
    // 连接归还后调用(异步执行)
    // 返回 true: 归还到池;false: 销毁
    // 用例:清理 session 状态、重置 search_path
    return true
}

config.BeforeClose = func(conn *pgx.Conn) {
    // 连接被销毁前调用
    // 用例:清理资源、记录日志
}

3.5 Pool.Stat() 监控

type Stat struct {
    AcquireCount()         int64         // 累计成功获取连接次数
    AcquireDuration()      time.Duration // 累计获取连接等待时间
    AcquiredConns()        int32         // 当前被持有的连接数
    CanceledAcquireCount() int64         // 因 context 取消的获取次数
    ConstructingConns()    int32         // 正在创建中的连接数
    EmptyAcquireCount()    int64         // 池为空时的获取次数
    EmptyAcquireWaitTime() time.Duration // 池为空时的等待总时长
    IdleConns()            int32         // 当前空闲连接数
    MaxConns()             int32         // 最大连接数
    TotalConns()           int32         // 总连接数
    NewConnsCount()        int64         // 累计新建连接数
    MaxLifetimeDestroyCount() int64      // 因超过 MaxConnLifetime 销毁的连接数
    MaxIdleDestroyCount()  int64         // 因超过 MaxConnIdleTime 销毁的连接数
}

应该暴露为 metrics 的关键指标


4. 查询执行模式(QueryExecMode)

pgx v5 引入了 5 种查询执行模式,统一控制 prepared statement 行为:

type QueryExecMode int32

const (
    QueryExecModeCacheStatement  // 默认。自动 Prepare + 缓存。LRU 缓存,容量 512
    QueryExecModeCacheDescribe   // 只缓存描述(参数/结果类型),不在服务端创建 prepared statement
    QueryExecModeDescribeExec    // 每次都 Describe,不缓存。安全但每次 2 RTT
    QueryExecModeExec            // 不 Describe,客户端推断参数类型。1 RTT,text 格式
    QueryExecModeSimpleProtocol  // Simple Query Protocol。参数客户端插值。1 RTT
)

各模式对比

模式RTT(首次/后续)Prepared Statement二进制格式PgBouncer 兼容适用场景
CacheStatement2/1服务端创建+缓存仅 session 模式默认,直连 PG
CacheDescribe2/1不创建,只缓存描述transaction 模式可用PgBouncer transaction 模式
DescribeExec2/2不创建,不缓存Schema 频繁变更
Exec1/1不使用否(text)极简场景
SimpleProtocol1/1不使用否(text)非 PG 代理/最大兼容

设置方式

// 全局设置(连接字符串)
connStr := "postgres://...?default_query_exec_mode=cache_describe"

// 全局设置(代码)
config.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe

// 单次查询覆盖(作为第一个参数传入)
pool.QueryRow(ctx, pgx.QueryExecModeSimpleProtocol, "SELECT $1", 42)

Statement Cache 内部机制

CacheStatement 模式下:

  1. 首次执行 SQL → 计算 SQL 的 SHA-256,生成 stmtcache_<hash> 作为 prepared statement 名
  2. 发送 Parse(创建 prepared statement)+ Describe(获取参数/结果类型)到服务端
  3. 缓存 StatementDescription(含参数 OID、结果字段描述)到 LRU cache(默认容量 512)
  4. 后续执行同一 SQL → 命中缓存 → 直接 Bind + Execute,只需 1 RTT

CacheDescribe 模式下:

  1. 首次执行 SQL → 发送 unnamed prepared statement 的 Parse + Describe
  2. 只缓存 StatementDescription,不在服务端保留 prepared statement
  3. 后续执行 → 使用缓存的描述做 ExecParams(extended protocol,但不引用 named statement)

为什么 PgBouncer transaction 模式与 CacheStatement 冲突:PgBouncer transaction 模式在每次事务结束后可能把连接换给其他客户端。而 prepared statement 是绑定到特定后端连接的——客户端认为 statement 已经 prepare 了,但实际后端连接可能已经换了一个没有这个 statement 的连接,导致 prepared statement does not exist 错误。


5. 查询执行模式

5.1 Query + CollectRows(推荐模式)

// pgx/v5 推荐:Query + CollectRows 泛型函数
type User struct {
    ID    int64  `db:"id"`
    Name  string `db:"name"`
    Email string `db:"email"`
}

// RowToStructByName — 按列名匹配 struct 字段(case-insensitive)
users, err := pgx.CollectRows(
    pool.Query(ctx, "SELECT id, name, email FROM users WHERE active = $1", true),
    pgx.RowToStructByName[User],
)
// users 是 []User,rows 已自动关闭

// RowTo — 扫描单列到基础类型
ids, err := pgx.CollectRows(
    pool.Query(ctx, "SELECT id FROM users WHERE active = $1", true),
    pgx.RowTo[int64],
)
// ids 是 []int64

// CollectExactlyOneRow — 期望恰好一行,多行返回 ErrTooManyRows
user, err := pgx.CollectExactlyOneRow(
    pool.Query(ctx, "SELECT id, name, email FROM users WHERE id = $1", userID),
    pgx.RowToStructByName[User],
)

// RowToMap — 扫描为 map[string]any
m, err := pgx.CollectRows(
    pool.Query(ctx, "SELECT * FROM users LIMIT 10"),
    pgx.RowToMap,
)

RowToStructByName 的匹配规则

5.2 QueryRow + Scan

var name string
var age int32
err := pool.QueryRow(ctx,
    "SELECT name, age FROM users WHERE id = $1", userID,
).Scan(&name, &age)
if errors.Is(err, pgx.ErrNoRows) {
    // 没找到行
}

5.3 Exec(写操作)

tag, err := pool.Exec(ctx,
    "UPDATE users SET email = $1 WHERE id = $2",
    "new@example.com", userID,
)
fmt.Println(tag.RowsAffected()) // 1
fmt.Println(tag.String())        // "UPDATE 1"

5.4 NamedArgs

// 用 @name 替代 $1, $2...,由 pgx 自动重写为 $N 占位符
rows, err := pool.Query(ctx,
    "SELECT * FROM users WHERE name = @name AND age > @minAge",
    pgx.NamedArgs{"name": "Alice", "minAge": 18},
)

// StrictNamedArgs — SQL 中的每个 @param 都必须在 map 中存在,且 map 中没有多余 key
rows, err := pool.Query(ctx,
    "SELECT * FROM users WHERE name = @name",
    pgx.StrictNamedArgs{"name": "Alice"},
)

5.5 ForEachRow(回调式遍历)

var total int64
var name string
var amount int64

rows, _ := pool.Query(ctx, "SELECT name, amount FROM transactions")
tag, err := pgx.ForEachRow(rows, []any{&name, &amount}, func() error {
    total += amount
    fmt.Printf("%s: %d\n", name, amount)
    return nil
})

6. Batch 批量操作

Batch 把多个查询打包成一次网络往返发送给服务端,显著减少 RTT 延迟。

6.1 基本用法

batch := &pgx.Batch{}

// 队列多个查询
batch.Queue("INSERT INTO events (name) VALUES ($1)", "event1")
batch.Queue("INSERT INTO events (name) VALUES ($1)", "event2")
batch.Queue("SELECT count(*) FROM events")

// 一次发送
br := pool.SendBatch(ctx, batch)
defer br.Close() // 必须关闭!

// 按入队顺序读取结果
tag1, err := br.Exec() // INSERT 1
tag2, err := br.Exec() // INSERT 2

var count int64
err = br.QueryRow().Scan(&count) // SELECT

6.2 回调模式(推荐)

batch := &pgx.Batch{}
var totalInserted int64
var names []string

// Exec 回调
batch.Queue("INSERT INTO events (name) VALUES ($1)", "e1").
    Exec(func(ct pgconn.CommandTag) error {
        totalInserted += ct.RowsAffected()
        return nil
    })

// Query 回调
batch.Queue("SELECT name FROM events ORDER BY id").
    Query(func(rows pgx.Rows) error {
        var err error
        names, err = pgx.CollectRows(rows, pgx.RowTo[string])
        return err
    })

// QueryRow 回调
var count int64
batch.Queue("SELECT count(*) FROM events").
    QueryRow(func(row pgx.Row) error {
        return row.Scan(&count)
    })

br := pool.SendBatch(ctx, batch)
err := br.Close() // 触发所有回调

6.3 Batch 内部实现

CacheStatement/CacheDescribe 模式下,Batch 使用 pipeline 模式(pgconn.Pipeline):所有查询的 Parse/Bind/Execute 消息连续发送,最后一次性 Sync,服务端按顺序返回所有结果。这比逐条查询少了 N-1 次 RTT。

错误处理:Batch 中某条查询出错后,后续查询的结果不可用。br.Close() 会返回第一个遇到的错误。回调模式下,出错后后续回调不会被执行。


7. COPY 协议

PostgreSQL COPY 是批量数据导入最快的方式。pgx 用 CopyFrom 方法封装了 COPY 的 binary 格式。

7.1 基本用法

// 从内存数据批量插入
rows := [][]any{
    {"Alice", "alice@example.com", int32(30)},
    {"Bob", "bob@example.com", int32(25)},
}

count, err := pool.CopyFrom(
    ctx,
    pgx.Identifier{"users"},          // 表名(自动引号转义)
    []string{"name", "email", "age"}, // 列名
    pgx.CopyFromRows(rows),           // 数据源
)
fmt.Printf("copied %d rows\n", count)

7.2 三种数据源

// 1. CopyFromRows — 从 [][]any 切片
pgx.CopyFromRows([][]any{{"a", 1}, {"b", 2}})

// 2. CopyFromSlice — 从已有 typed slice,通过函数转换
type User struct { Name string; Age int32 }
users := []User{{"Alice", 30}, {"Bob", 25}}
pgx.CopyFromSlice(len(users), func(i int) ([]any, error) {
    return []any{users[i].Name, users[i].Age}, nil
})

// 3. CopyFromFunc — 流式生成,适合大数据集(不需要全部加载到内存)
pgx.CopyFromFunc(func() ([]any, error) {
    row, err := csvReader.Read()
    if err == io.EOF {
        return nil, nil // 返回 (nil, nil) 表示结束
    }
    if err != nil {
        return nil, err // 返回 error 中止 COPY
    }
    return []any{row[0], row[1]}, nil
})

7.3 性能对比

方法1000 行10000 行100000 行
逐条 INSERT~500ms~5s~50s
Batch INSERT~50ms~500ms~5s
COPY Protocol~10ms~50ms~200ms

COPY 在 5 行以上就比单条 INSERT 快(来自 pgx 官方文档)。数据量越大优势越明显,因为 COPY 使用二进制格式且是流式传输。

注意:COPY 要求所有列类型都支持二进制编码。自定义 enum 类型必须先用 conn.LoadType + RegisterType 注册,否则会报错。


8. LISTEN/NOTIFY

PostgreSQL 内置的 pub/sub 机制,适合轻量级事件通知。

// LISTEN/NOTIFY 需要一个专用连接——不能用 pool 的短连接
conn, err := pgx.Connect(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
    log.Fatal(err)
}
defer conn.Close(ctx)

// 订阅频道
_, err = conn.Exec(ctx, "LISTEN order_events")
if err != nil {
    log.Fatal(err)
}

// 阻塞等待通知
for {
    notification, err := conn.WaitForNotification(ctx)
    if err != nil {
        log.Printf("wait error: %v", err)
        break
    }
    fmt.Printf("channel: %s, payload: %s\n",
        notification.Channel, notification.Payload)
}

// 在另一个连接/事务中发送通知
pool.Exec(ctx, "NOTIFY order_events, $1",
    `{"order_id": 123, "status": "paid"}`)
// 或在 SQL 中:SELECT pg_notify('order_events', '...')

与 pgxpool 集成的注意事项

// 从 pool 获取专用连接的正确方式
c, err := pool.Acquire(ctx)
if err != nil {
    log.Fatal(err)
}
conn := c.Hijack() // 从池中取走,不会被回收
defer conn.Close(ctx)

// 现在可以安全地 LISTEN
_, err = conn.Exec(ctx, "LISTEN my_channel")
// ...

9. 事务处理

9.1 基本事务

tx, err := pool.Begin(ctx)
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Commit 成功后 Rollback 是安全的 no-op

_, err = tx.Exec(ctx, "INSERT INTO orders (user_id) VALUES ($1)", userID)
if err != nil {
    return err
}

err = tx.Commit(ctx)
return err

9.2 BeginTxFunc(推荐模式)

// 自动 commit/rollback,避免忘记 defer Rollback
err := pgx.BeginTxFunc(ctx, pool, pgx.TxOptions{
    IsoLevel:   pgx.Serializable,
    AccessMode: pgx.ReadWrite,
}, func(tx pgx.Tx) error {
    _, err := tx.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
    if err != nil {
        return err // 自动 Rollback
    }
    _, err = tx.Exec(ctx, "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
    return err // nil → 自动 Commit; error → 自动 Rollback
})

9.3 TxOptions

type TxOptions struct {
    IsoLevel       TxIsoLevel       // Serializable, RepeatableRead, ReadCommitted, ReadUncommitted
    AccessMode     TxAccessMode     // ReadWrite, ReadOnly
    DeferrableMode TxDeferrableMode // Deferrable, NotDeferrable
    BeginQuery     string           // 自定义 BEGIN SQL(如 CockroachDB 的 "BEGIN PRIORITY HIGH")
    CommitQuery    string           // 自定义 COMMIT SQL
}

9.4 Savepoints(伪嵌套事务)

tx, err := pool.Begin(ctx)
defer tx.Rollback(ctx)

// tx.Begin() 创建 savepoint(不是真的嵌套事务)
sp, err := tx.Begin(ctx)
if err != nil {
    return err
}

_, err = sp.Exec(ctx, "INSERT INTO audit_log ...")
if err != nil {
    sp.Rollback(ctx) // 只回滚到 savepoint,外层事务不受影响
} else {
    sp.Commit(ctx)   // 释放 savepoint
}

// 外层事务继续
_, err = tx.Exec(ctx, "UPDATE ...")
return tx.Commit(ctx)

内部实现:tx.Begin() 执行 SAVEPOINT sp_Nsp.Commit() 执行 RELEASE SAVEPOINT sp_Nsp.Rollback() 执行 ROLLBACK TO SAVEPOINT sp_N

事务与连接池pool.Begin() 会 Acquire 一个连接,这个连接在 tx.Commit()tx.Rollback() 之前不会归还。长事务 = 长期占用连接 = 池可用连接减少。


10. 类型系统(pgtype)

10.1 架构

// Map 是类型注册表——关联 PostgreSQL OID 与 Go Codec
type Map struct { /* ... */ }

// Type 表示一个已注册的 PostgreSQL 类型
type Type struct {
    Name  string // 类型名称
    OID   uint32 // PostgreSQL OID
    Codec Codec  // 编解码器
}

// Codec 接口——负责一种 PostgreSQL 类型的编解码
type Codec interface {
    FormatSupported(format int16) bool
    PreferredFormat() int16
    PlanEncode(m *Map, oid uint32, format int16, value any) EncodePlan
    PlanScan(m *Map, oid uint32, format int16, target any) ScanPlan
    DecodeDatabaseSQLValue(m *Map, oid uint32, format int16, src []byte) (driver.Value, error)
    DecodeValue(m *Map, oid uint32, format int16, src []byte) (any, error)
}

每个 pgx.Conn 创建时会初始化一个 pgtype.NewMap(),默认包含 70+ 内置类型。

10.2 内置类型映射

PostgreSQL 类型pgtype 类型Go 原生类型
int2/int4/int8pgtype.Int2/Int4/Int8int16/int32/int64
float4/float8pgtype.Float4/Float8float32/float64
text/varcharpgtype.Textstring
boolpgtype.Boolbool
byteapgtype.Bytea[]byte
uuidpgtype.UUID[16]byte
timestamptzpgtype.Timestamptztime.Time
timestamppgtype.Timestamptime.Time
datepgtype.Datetime.Time
intervalpgtype.Interval
json/jsonbpgtype.JSONB[]byte / 任意 struct
inet/cidrnetip.Addr / netip.Prefix
int4[]pgtype.Array[pgtype.Int4][]int32
text[]pgtype.Array[pgtype.Text][]string
numericpgtype.Numeric
hstorepgtype.Hstoremap[string]*string

10.3 Null 处理

pgx v5 提供三种 null 处理方式:

// 方式 1: pgtype.T 的 Valid 字段(推荐,零值安全)
var name pgtype.Text
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
if name.Valid {
    fmt.Println(name.String)
} else {
    fmt.Println("NULL")
}

// 方式 2: Go 指针(简洁,但需要解引用检查)
var name *string
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
if name != nil {
    fmt.Println(*name)
}

// 方式 3: database/sql 的 NullString(兼容旧代码)
var name sql.NullString
err := pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)

选择建议:在纯 pgx 项目中用 *string / *int64(最简洁)。需要区分「零值」和「NULL」时用 pgtype.Textsql.NullString 只在 database/sql 兼容场景使用。

10.4 自定义类型注册

// 在 AfterConnect 中注册自定义 enum
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
    // LoadType 从数据库查询类型的 OID 和定义
    dt, err := conn.LoadType(ctx, "order_status")
    if err != nil {
        return fmt.Errorf("load type order_status: %w", err)
    }
    conn.TypeMap().RegisterType(dt)

    // 注册数组类型
    dtArray, err := conn.LoadType(ctx, "_order_status") // 数组类型名前缀 _
    if err != nil {
        return fmt.Errorf("load type _order_status: %w", err)
    }
    conn.TypeMap().RegisterType(dtArray)

    return nil
}

11. Tracing 与可观测性

11.1 Tracer 接口

pgx 通过 interface 定义了 5 种 tracer,设置在 ConnConfig.Tracer 上。一个 tracer 可以实现多个接口。

// 查询追踪(Query/QueryRow/Exec)
type QueryTracer interface {
    TraceQueryStart(ctx context.Context, conn *Conn, data TraceQueryStartData) context.Context
    TraceQueryEnd(ctx context.Context, conn *Conn, data TraceQueryEndData)
}

// Batch 追踪
type BatchTracer interface {
    TraceBatchStart(ctx context.Context, conn *Conn, data TraceBatchStartData) context.Context
    TraceBatchQuery(ctx context.Context, conn *Conn, data TraceBatchQueryData)
    TraceBatchEnd(ctx context.Context, conn *Conn, data TraceBatchEndData)
}

// COPY 追踪
type CopyFromTracer interface {
    TraceCopyFromStart(ctx context.Context, conn *Conn, data TraceCopyFromStartData) context.Context
    TraceCopyFromEnd(ctx context.Context, conn *Conn, data TraceCopyFromEndData)
}

// Prepare 追踪
type PrepareTracer interface {
    TracePrepareStart(ctx context.Context, conn *Conn, data TracePrepareStartData) context.Context
    TracePrepareEnd(ctx context.Context, conn *Conn, data TracePrepareEndData)
}

// 连接追踪
type ConnectTracer interface {
    TraceConnectStart(ctx context.Context, data TraceConnectStartData) context.Context
    TraceConnectEnd(ctx context.Context, data TraceConnectEndData)
}

pgxpool 额外定义了两个:

type AcquireTracer interface {
    TraceAcquireStart(ctx context.Context, pool *Pool, data TraceAcquireStartData) context.Context
    TraceAcquireEnd(ctx context.Context, pool *Pool, data TraceAcquireEndData)
}
type ReleaseTracer interface {
    TraceRelease(pool *Pool, data TraceReleaseData)
}

11.2 OpenTelemetry 集成

使用 github.com/exaring/otelpgx

import "github.com/exaring/otelpgx"

config, _ := pgxpool.ParseConfig(connStr)
config.ConnConfig.Tracer = otelpgx.NewTracer()
pool, _ := pgxpool.NewWithConfig(ctx, config)

11.3 日志集成

使用 pgx/tracelog + 日志适配器(支持 slog、zap、zerolog 等):

import (
    "github.com/jackc/pgx/v5/tracelog"
    "github.com/mcosta74/pgx-slog"
)

config.ConnConfig.Tracer = &tracelog.TraceLog{
    Logger:   pgxslog.NewLogger(slog.Default()),
    LogLevel: tracelog.LogLevelInfo,
}

11.4 组合多个 Tracer

import "github.com/jackc/pgx/v5/multitracer"

config.ConnConfig.Tracer = &multitracer.Tracer{
    Tracers: []pgx.QueryTracer{
        otelpgx.NewTracer(),
        &tracelog.TraceLog{Logger: myLogger, LogLevel: tracelog.LogLevelDebug},
    },
}

12. pgx vs database/sql 对比

原生 pgx API 的优势

能力pgx 原生database/sql + pgx/stdlib
COPY 协议pool.CopyFrom()不支持(需 Raw() 降级)
Batch 批量pool.SendBatch()不支持
LISTEN/NOTIFYconn.WaitForNotification()不支持
二进制格式默认启用部分类型
自定义类型(enum/composite)RegisterType需要 SQLScanner 适配
泛型扫描CollectRows[T]不支持
NamedArgspgx.NamedArgs不支持
连接池统计pool.Stat() 详细指标db.Stats() 基础指标
性能更高(减少反射、支持二进制)额外一层适配开销
查询模式控制QueryExecMode有限

database/sql 的优势

stdlib 混合用法

// 从 database/sql 连接获取底层 pgx.Conn
db, _ := sql.Open("pgx", connStr)

conn, _ := db.Conn(ctx)
err := conn.Raw(func(driverConn any) error {
    pgxConn := driverConn.(*stdlib.Conn).Conn() // *pgx.Conn
    // 使用 pgx 原生功能
    _, err := pgxConn.CopyFrom(ctx, pgx.Identifier{"users"}, cols, src)
    return err
})

// 或从 pgxpool 创建 database/sql 连接
pool, _ := pgxpool.New(ctx, connStr)
db := stdlib.OpenDBFromPool(pool)

13. 完整代码示例

13.1 pgxpool 初始化与 CRUD

// pgx/v5
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
)

func main() {
    ctx := context.Background()

    config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    config.MaxConns = 20
    config.MinConns = 5
    config.MaxConnLifetime = 30 * time.Minute
    config.MaxConnLifetimeJitter = 3 * time.Minute
    config.MaxConnIdleTime = 10 * time.Minute
    config.HealthCheckPeriod = 30 * time.Second

    pool, err := pgxpool.NewWithConfig(ctx, config)
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()

    // Ping 验证连接
    if err := pool.Ping(ctx); err != nil {
        log.Fatal("cannot reach database:", err)
    }

    // CREATE
    var id int64
    err = pool.QueryRow(ctx,
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
        "Alice", "alice@example.com",
    ).Scan(&id)
    if err != nil {
        log.Fatal(err)
    }

    // READ — 单行
    type User struct {
        ID    int64  `db:"id"`
        Name  string `db:"name"`
        Email string `db:"email"`
    }
    user, err := pgx.CollectExactlyOneRow(
        pool.Query(ctx, "SELECT id, name, email FROM users WHERE id = $1", id),
        pgx.RowToStructByName[User],
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("User: %+v\n", user)

    // READ — 多行
    users, err := pgx.CollectRows(
        pool.Query(ctx, "SELECT id, name, email FROM users ORDER BY id LIMIT 100"),
        pgx.RowToStructByName[User],
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Found %d users\n", len(users))

    // UPDATE
    tag, err := pool.Exec(ctx,
        "UPDATE users SET email = $1 WHERE id = $2",
        "alice-new@example.com", id,
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Updated %d rows\n", tag.RowsAffected())

    // DELETE
    tag, err = pool.Exec(ctx, "DELETE FROM users WHERE id = $1", id)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Deleted %d rows\n", tag.RowsAffected())
}

13.2 Batch 操作

func batchExample(ctx context.Context, pool *pgxpool.Pool) error {
    batch := &pgx.Batch{}
    var counts [3]int64

    for i := range 3 {
        name := fmt.Sprintf("user_%d", i)
        batch.Queue(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            name, name+"@example.com",
        ).Exec(func(ct pgconn.CommandTag) error {
            counts[i] = ct.RowsAffected()
            return nil
        })
    }

    var total int64
    batch.Queue("SELECT count(*) FROM users").QueryRow(func(row pgx.Row) error {
        return row.Scan(&total)
    })

    br := pool.SendBatch(ctx, batch)
    if err := br.Close(); err != nil {
        return fmt.Errorf("batch failed: %w", err)
    }

    fmt.Printf("Inserted: %v, Total: %d\n", counts, total)
    return nil
}

13.3 COPY 批量插入

func copyExample(ctx context.Context, pool *pgxpool.Pool) error {
    // 生成测试数据
    const n = 10000
    rows := make([][]any, n)
    for i := range n {
        rows[i] = []any{
            fmt.Sprintf("user_%d", i),
            fmt.Sprintf("user_%d@example.com", i),
            int32(20 + i%50),
        }
    }

    count, err := pool.CopyFrom(
        ctx,
        pgx.Identifier{"users"},
        []string{"name", "email", "age"},
        pgx.CopyFromRows(rows),
    )
    if err != nil {
        return fmt.Errorf("copy failed: %w", err)
    }

    fmt.Printf("Copied %d rows\n", count)
    return nil
}

13.4 事务与 Savepoints

func transferWithAudit(ctx context.Context, pool *pgxpool.Pool,
    fromID, toID int64, amount int64) error {

    return pgx.BeginTxFunc(ctx, pool, pgx.TxOptions{
        IsoLevel: pgx.ReadCommitted,
    }, func(tx pgx.Tx) error {
        // 主事务:转账
        _, err := tx.Exec(ctx,
            "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
        if err != nil {
            return err
        }
        _, err = tx.Exec(ctx,
            "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
        if err != nil {
            return err
        }

        // Savepoint:审计日志(失败不影响转账)
        sp, err := tx.Begin(ctx)
        if err != nil {
            return err
        }
        _, err = sp.Exec(ctx,
            "INSERT INTO audit_log (action, details) VALUES ($1, $2)",
            "transfer", fmt.Sprintf("%d->%d: %d", fromID, toID, amount))
        if err != nil {
            sp.Rollback(ctx) // 只回滚审计日志
        } else {
            sp.Commit(ctx)
        }

        return nil // 外层事务提交
    })
}

13.5 LISTEN/NOTIFY

func listenExample(ctx context.Context, pool *pgxpool.Pool) error {
    // 获取专用连接
    c, err := pool.Acquire(ctx)
    if err != nil {
        return err
    }
    conn := c.Hijack() // 从池中取走
    defer conn.Close(context.Background())

    _, err = conn.Exec(ctx, "LISTEN order_events")
    if err != nil {
        return err
    }

    log.Println("Listening for order_events...")

    for {
        notification, err := conn.WaitForNotification(ctx)
        if err != nil {
            return fmt.Errorf("wait: %w", err)
        }
        log.Printf("Received on [%s]: %s\n",
            notification.Channel, notification.Payload)
    }
}

// 发送通知(在任何连接上)
func notifyExample(ctx context.Context, pool *pgxpool.Pool) error {
    _, err := pool.Exec(ctx,
        "SELECT pg_notify($1, $2)",
        "order_events",
        `{"order_id": 42, "status": "shipped"}`,
    )
    return err
}

13.6 RowToStructByName 泛型扫描

type Product struct {
    ID          int64      `db:"id"`
    Name        string     `db:"name"`
    Price       int64      `db:"price_cents"`
    Description *string    `db:"description"` // nullable
    CreatedAt   time.Time  `db:"created_at"`
    Tags        []string   `db:"tags"`        // PostgreSQL text[]
}

func getProducts(ctx context.Context, pool *pgxpool.Pool, minPrice int64) ([]Product, error) {
    return pgx.CollectRows(
        pool.Query(ctx,
            `SELECT id, name, price_cents, description, created_at, tags
             FROM products
             WHERE price_cents >= $1
             ORDER BY created_at DESC`, minPrice),
        pgx.RowToStructByName[Product],
    )
}

// Lax 模式:允许 struct 字段比查询列多(部分查询场景)
type ProductSummary struct {
    ID   int64  `db:"id"`
    Name string `db:"name"`
    // Description 等字段不在查询中,Lax 模式不会报错
    Description *string `db:"description"`
}

func getProductNames(ctx context.Context, pool *pgxpool.Pool) ([]ProductSummary, error) {
    return pgx.CollectRows(
        pool.Query(ctx, "SELECT id, name FROM products"),
        pgx.RowToStructByNameLax[ProductSummary],
    )
}

14. Go PostgreSQL 驱动/工具对比

pgx 原生database/sql + pgx/stdliblib/pqGORMsqlc
定位底层驱动标准接口适配旧标准驱动ORM代码生成器
底层直接 wire protocolpgx 适配独立实现pgx via gorm.io/driver/postgres生成 pgx 代码
COPY原生支持Raw() 降级不支持不支持不支持
Batch原生支持不支持不支持不支持可用
LISTEN/NOTIFY原生支持不支持有限支持不支持不支持
类型安全运行时运行时运行时运行时(反射)编译时
性能最高稍低较低(反射开销)接近 pgx 原生
学习成本高(ORM 概念)中(写 SQL + 配置)
适用场景性能敏感、PG 深度功能需要生态兼容遗留项目快速 CRUDSQL-first 项目
维护状态活跃活跃维护模式活跃活跃

推荐组合


15. 陷阱(Pitfalls)

1. 连接池耗尽:长事务和 LISTEN 是元凶

pool.Begin() 持有一个连接直到 Commit/Rollback。如果事务中有慢查询、HTTP 调用、或忘记 Commit,连接不会归还。当 AcquiredConns == MaxConns 时,所有新请求都在等待。

// 错误:在事务中调用外部 API
tx, _ := pool.Begin(ctx)
resp, _ := http.Get("https://slow-api.com/validate") // 可能 hang 30s
tx.Exec(ctx, "INSERT ...")
tx.Commit(ctx)

// 正确:先完成外部调用,再开事务
resp, _ := http.Get("https://slow-api.com/validate")
tx, _ := pool.Begin(ctx)
tx.Exec(ctx, "INSERT ...")
tx.Commit(ctx)

LISTEN 更危险——WaitForNotification 会无限期持有连接。必须用 Hijack() 或独立 pgx.Connect

2. PgBouncer transaction 模式与 prepared statement 冲突

// 错误:使用默认 CacheStatement 模式连接 PgBouncer transaction pool
pool, _ := pgxpool.New(ctx, "postgres://pgbouncer:6432/db")
// → "prepared statement stmtcache_xxx does not exist"

// 正确:切换到 CacheDescribe 或更低模式
config, _ := pgxpool.ParseConfig("postgres://pgbouncer:6432/db")
config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe
// 或在连接字符串中:?default_query_exec_mode=cache_describe

3. 不使用 CollectRows 导致资源泄漏

// 错误:Query 返回的 Rows 没有关闭
rows, err := pool.Query(ctx, "SELECT * FROM users")
if err != nil {
    return err
}
// 忘记 rows.Close() → 连接不会归还到池!

// 正确:用 CollectRows(自动关闭)
users, err := pgx.CollectRows(
    pool.Query(ctx, "SELECT * FROM users"),
    pgx.RowToStructByName[User],
)
// 或手动 defer rows.Close()

4. Context 取消与查询取消的行为

context.Cancel() 会向 PostgreSQL 发送 cancel 请求(通过 BackendKeyData 中的 PID 和 secret key)。但 cancel 是异步的——PostgreSQL 可能在取消请求到达前已经完成了查询。

对于写操作:context cancel 可能导致部分写入(PostgreSQL 已执行但 Go 侧认为超时)。关键写操作应该用独立 context 或不设超时。

5. pgtype null 处理混淆

// 错误:用 string 接收可能为 NULL 的列
var name string
pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)
// name 为 NULL 时 → Scan 报错 "cannot scan NULL into *string"

// 正确:用 *string 或 pgtype.Text
var name *string
pool.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", id).Scan(&name)

6. COPY 的类型注册要求

// 错误:COPY 自定义 enum 列但未注册类型
pool.CopyFrom(ctx, pgx.Identifier{"orders"}, []string{"status"}, rows)
// → "unable to encode ... for OID ..." 因为 COPY 强制使用 binary 格式

// 正确:在 AfterConnect 中注册
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
    dt, _ := conn.LoadType(ctx, "order_status")
    conn.TypeMap().RegisterType(dt)
    return nil
}

7. 连接字符串 sslmode 默认行为

pgx 默认 sslmode=prefer(尝试 TLS,失败则明文)。生产环境应该明确设置 sslmode=requiresslmode=verify-full

# 生产环境
postgres://user:pass@host:5432/db?sslmode=verify-full&sslrootcert=/path/to/ca.pem

8. RowToStructByName 的字段匹配陷阱

type User struct {
    ID   int64  `db:"id"`
    Name string `db:"name"`
    Age  int32  // 没有 db tag
}

// 查询 SELECT id, name, user_age FROM users
// → "struct doesn't have corresponding row field user_age"
// 因为没有 db tag 的 Age 字段匹配 "Age"(忽略下划线后是 "age"),不匹配 "user_age"

// 修复:加上 db tag
type User struct {
    ID   int64  `db:"id"`
    Name string `db:"name"`
    Age  int32  `db:"user_age"`
}

9. Scan 到错误类型不一定报错

pgx 使用 text 格式时,某些类型转换是静默的。例如数据库返回 int8(bigint),但你 Scan 到 int32,如果值在 int32 范围内不会报错,超范围才报错。binary 格式下类型匹配更严格。

10. Pool.Reset() 不等于重新创建池

pool.Reset() 会关闭所有空闲连接,但被持有的连接只有在归还时才会被关闭。在网络中断后调用 Reset() 可以快速恢复,但不要指望它立即终止正在使用的连接。


16. 生产 Checklist

连接池配置

PgBouncer 兼容

可观测性

类型安全

优雅关闭

// 优雅关闭:等待所有连接归还后关闭
// pool.Close() 会阻塞直到所有 Acquired 连接 Release
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 先停止接受新请求,然后:
pool.Close() // 阻塞等待

错误处理

// 判断 PostgreSQL 错误码
import "github.com/jackc/pgerrcode"

var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
    switch pgErr.Code {
    case pgerrcode.UniqueViolation:
        // 唯一约束冲突
    case pgerrcode.SerializationFailure:
        // 序列化冲突,应该重试
    case pgerrcode.DeadlockDetected:
        // 死锁,应该重试
    }
}

测试


延伸方向


分享这篇文章:

上一篇
Go Redis:go-redis/v9 的连接池、Pipeline 与 Hook
下一篇
C++ 协程:语言机制、陷阱与实现边界