跳转到正文
zeno's blog
返回

tokio(四):陷阱与生产最佳实践

专题: tokio

Table of contents

Open Table of contents

TL;DR

tokio 最常见的陷阱分为两类:阻塞运行时(在 async 上下文中做同步操作)和取消安全(Future 在 .await 点被 drop 导致状态丢失)。本文列举 10 个高频陷阱和对应的生产级修复方案,以及 runtime 配置、优雅关闭、可观测性和错误处理的最佳实践。


陷阱(Pitfalls)

陷阱 1:在 async 任务中使用 std::sync::Mutex 跨 .await 持有锁

症状:性能退化、worker 线程被阻塞、极端情况下死锁。

// 错误 ❌
let data = Arc::new(std::sync::Mutex::new(HashMap::new()));
tokio::spawn({
    let data = data.clone();
    async move {
        let mut guard = data.lock().unwrap();  // 阻塞 worker 线程
        let value = fetch_from_db().await;     // 持有锁跨 .await!
        guard.insert("key", value);            // 其他 worker 等锁时被阻塞
    }
});

根因std::sync::Mutex::lock() 是阻塞调用。如果锁被另一个任务持有,当前 worker 线程 被阻塞——不是任务被阻塞,是 整个线程。其他本该在这个线程上运行的任务全部停滞。

修复

// 方案 A: 临界区内没有 .await —— 用 std Mutex,快速释放
let data = Arc::new(std::sync::Mutex::new(HashMap::new()));
tokio::spawn({
    let data = data.clone();
    async move {
        let value = fetch_from_db().await;  // 先完成 I/O
        let mut guard = data.lock().unwrap();
        guard.insert("key", value);  // 快速操作,无 .await
        // guard 在此 drop
    }
});

// 方案 B: 必须跨 .await 持有锁 —— 用 tokio Mutex
let data = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
tokio::spawn({
    let data = data.clone();
    async move {
        let mut guard = data.lock().await;  // 不阻塞线程
        let value = fetch_from_db().await;
        guard.insert("key", value);
    }
});

陷阱 2:CPU 密集计算阻塞 async 运行时

症状:所有 I/O 操作延迟飙升,定时器不准,任务饥饿。

// 错误 ❌
tokio::spawn(async {
    // 在 worker 线程上做 CPU 密集计算
    let result = (0..10_000_000).map(|x| x * x).sum::<u64>();
    // 这段代码没有任何 .await 点,worker 线程完全被占用
});

根因:tokio 的 coop budget 只在 .await tokio 资源时递减。纯 CPU 计算没有 .await 点,coop 无法介入。

修复

// 方案 A: spawn_blocking
let result = tokio::task::spawn_blocking(|| {
    (0..10_000_000).map(|x| x * x).sum::<u64>()
}).await.unwrap();

// 方案 B: 用 rayon(专业并行计算库)
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
    let result: u64 = (0..10_000_000u64).into_par_iter().map(|x| x * x).sum();
    let _ = tx.send(result);
});
let result = rx.await.unwrap();

陷阱 3:任务取消在 .await 点丢弃状态

症状:数据写入不完整、资源泄漏、业务逻辑不一致。

// 危险 ❌
async fn transfer_money(from: &Account, to: &Account, amount: u64) {
    from.debit(amount).await;   // 步骤 1: 扣款
    // <-- 如果在这里被取消(如 select! 中另一个分支先完成)
    to.credit(amount).await;    // 步骤 2: 入账 —— 可能永远不执行
}

// 在 select! 中使用
tokio::select! {
    _ = transfer_money(&alice, &bob, 100) => {}
    _ = tokio::time::sleep(Duration::from_secs(5)) => {
        println!("timeout");
        // transfer_money 在 debit 之后、credit 之前被 drop
        // 钱扣了但没到账!
    }
}

修复

// 方案 A: spawn 独立任务(不受 select! 取消影响)
let handle = tokio::spawn(transfer_money(alice.clone(), bob.clone(), 100));
tokio::select! {
    result = handle => { /* 完成 */ }
    _ = tokio::time::sleep(Duration::from_secs(5)) => {
        // handle 被 drop,但 task 继续运行直到完成
        println!("timeout, but transfer continues in background");
    }
}

// 方案 B: 使用数据库事务确保原子性(根本解决方案)
async fn transfer_money(db: &Pool, from_id: i64, to_id: i64, amount: u64) {
    let mut tx = db.begin().await.unwrap();
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
        .bind(amount as i64).bind(from_id).execute(&mut *tx).await.unwrap();
    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(amount as i64).bind(to_id).execute(&mut *tx).await.unwrap();
    tx.commit().await.unwrap();
    // 即使被取消,未 commit 的事务会被数据库回滚
}

陷阱 4:select! 循环中丢失 Future 状态

症状:定时器永远不触发、消息被跳过。

// 错误 ❌ —— sleep 每次循环都重新创建
loop {
    tokio::select! {
        msg = rx.recv() => { process(msg).await; }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {
            // 永远不会触发:每次 msg 到达时 sleep 被 drop 并重建
            break;
        }
    }
}

// 正确 ✅ —— pin sleep,跨迭代复用
let sleep = tokio::time::sleep(Duration::from_secs(30));
tokio::pin!(sleep);

loop {
    tokio::select! {
        msg = rx.recv() => {
            process(msg).await;
            // 可选:收到消息时重置超时
            // sleep.as_mut().reset(Instant::now() + Duration::from_secs(30));
        }
        _ = &mut sleep => {
            println!("30s idle, shutting down");
            break;
        }
    }
}

陷阱 5:spawn 的任务引用非 ‘static 数据

症状:编译错误(Rust 编译器阻止这个错误,不会变成运行时 bug)。

// 编译错误 ❌
async fn process(data: &[u8]) {
    tokio::spawn(async {
        println!("{:?}", data);  // `data` 的生命周期不是 'static
        // error: borrowed data escapes outside of function
    });
}

// 修复 ✅ —— 将数据 move 进 task
async fn process(data: &[u8]) {
    let owned = data.to_vec();  // 拷贝为 owned 数据
    tokio::spawn(async move {
        println!("{:?}", owned);
    });
}

为什么有 'static 要求:spawned task 可能比 spawner 活得更久。如果 task 持有 spawner 栈上数据的引用,spawner 返回后引用会悬垂。Rust 在编译期阻止了这个问题(C++ Asio 中这是运行时 bug)。

陷阱 6:忘记 .await 一个 Future

症状:编译器 warning(Rust 1.x),代码看起来应该执行但实际什么都没发生。

// 错误 ❌
async fn handler() {
    tokio::time::sleep(Duration::from_secs(1));  // 缺少 .await!
    // 创建了 Sleep future 但立即 drop,不会等待 1 秒
    println!("this prints immediately");
}

// 正确 ✅
async fn handler() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("this prints after 1 second");
}

根因:Rust Future 是惰性的——创建 Future 不执行任何操作。编译器会发出 #[must_use] warning,但容易在嘈杂的编译输出中被忽略。

陷阱 7:嵌套 Runtime::block_on

症状:panic(“Cannot start a runtime from within a runtime”)。

// 错误 ❌
#[tokio::main]
async fn main() {
    // 已经在 tokio runtime 内
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {  // panic!
        // ...
    });
}

// 修复 ✅ —— 如果需要在 sync 代码中调用 async
fn sync_function() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        // 这是合法的,因为 sync_function 不在 runtime 内
    });
}

// 在已有 runtime 中:直接 .await 或 spawn
#[tokio::main]
async fn main() {
    some_async_fn().await;  // 直接 await
    tokio::spawn(some_other_fn());  // 或 spawn
}

陷阱 8:mpsc channel 容量不当导致反压失控

症状:内存无限增长(无界 channel)或生产者被意外阻塞(有界 channel 容量太小)。

// 危险 ❌ —— 无界 channel,生产者永远不会被阻塞
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
// 如果 consumer 处理不过来,消息无限堆积 → OOM

// 正确 ✅ —— 有界 channel + 处理 send 失败
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1024);
if tx.send(event).await.is_err() {
    // receiver 已 drop,处理错误
    tracing::error!("receiver dropped, event lost");
}

// 或使用 try_send 非阻塞发送
match tx.try_send(event) {
    Ok(()) => {}
    Err(TrySendError::Full(event)) => {
        // channel 满,决定:丢弃 / 等待 / 降级
        tracing::warn!("channel full, dropping event");
    }
    Err(TrySendError::Closed(event)) => {
        tracing::error!("receiver gone");
    }
}

陷阱 9:任务粒度不当

太多小任务:每个任务都有分配开销(~300-500 bytes)、调度开销。百万级微任务会导致调度器压力过大。

// 错误 ❌ —— 为每个字节处理 spawn 一个任务
for byte in data.iter() {
    tokio::spawn(async move { process_byte(*byte).await });
}

// 正确 ✅ —— 批量处理
for chunk in data.chunks(1024) {
    let chunk = chunk.to_vec();
    tokio::spawn(async move {
        for byte in chunk { process_byte(byte).await; }
    });
}

太少大任务:一个任务做太多事,无法被并行处理,浪费多核。

// 错误 ❌ —— 一个任务串行处理所有连接
async fn handle_all(listener: TcpListener) {
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;  // 串行!一个连接处理完才处理下一个
    }
}

// 正确 ✅ —— 每个连接一个任务
async fn handle_all(listener: TcpListener) {
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        tokio::spawn(process(socket));  // 并发处理
    }
}

陷阱 10:在 async 代码中调用阻塞 I/O

症状:与陷阱 2 相同——worker 线程被阻塞。

// 错误 ❌
tokio::spawn(async {
    let content = std::fs::read_to_string("large_file.txt").unwrap();  // 阻塞!
    process(content).await;
});

// 正确 ✅
tokio::spawn(async {
    let content = tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("large_file.txt").unwrap()
    }).await.unwrap();
    process(content).await;
});

// 或使用 tokio::fs(内部也是 spawn_blocking,但 API 更 ergonomic)
tokio::spawn(async {
    let content = tokio::fs::read_to_string("large_file.txt").await.unwrap();
    process(content).await;
});

注意tokio::fs 内部就是对 spawn_blocking + std::fs 的封装。目前 tokio 没有基于 io_uring 的真正异步文件 I/O(尽管 tokio 1.0 路线图提到了这一计划,tokio-uring 作为独立 crate 存在但不在 tokio 主 crate 中)。


生产最佳实践

Runtime 配置指南

工作负载类型推荐配置原因
I/O 密集型(Web 服务器、代理)worker_threads = CPU 核心数(默认值)worker 大部分时间在等 I/O
CPU + I/O 混合worker_threads = CPU 核心数,配合 spawn_blocking / rayon分离 CPU 和 I/O 工作
高连接数(10K+)默认 worker + 增大 max_blocking_threads阻塞线程池可能成为瓶颈
嵌入式 / 资源受限current_thread最小开销
每核一个 runtime(thread-per-core)多个 current_thread runtime,每个绑定到一个 CPU 核心避免跨核通信,最大化缓存局部性
// 生产级 runtime 配置示例
let runtime = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(num_cpus::get())       // 或从配置读取
    .max_blocking_threads(512)             // 根据阻塞操作量调整
    .thread_name_fn(|| {                   // 线程名包含编号,方便调试
        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
        format!("my-app-worker-{}", id)
    })
    .thread_stack_size(4 * 1024 * 1024)    // 4MB,深递归场景可调大
    .enable_all()
    .build()
    .expect("failed to build tokio runtime");

优雅关闭模式

推荐的三阶段关闭:

use tokio::signal;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();
    let tracker = TaskTracker::new();

    // 阶段 1: 启动工作任务
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
    loop {
        tokio::select! {
            // 接受新连接
            Ok((socket, _)) = listener.accept() => {
                let token = token.clone();
                tracker.spawn(async move {
                    tokio::select! {
                        _ = handle_connection(socket) => {}
                        _ = token.cancelled() => {
                            // 可在此执行清理:flush buffer、发 FIN 等
                            tracing::info!("connection handler shutting down");
                        }
                    }
                });
            }
            // 阶段 2: 收到关闭信号
            _ = signal::ctrl_c() => {
                tracing::info!("shutdown signal received");
                break;
            }
        }
    }

    // 阶段 3: 通知所有任务停止,等待它们完成
    token.cancel();      // 通知所有任务
    tracker.close();     // 不再接受新任务
    tracker.wait().await; // 等待所有任务退出
    tracing::info!("graceful shutdown complete");
}

关闭超时:防止某些任务永远不退出:

// 带超时的等待
match tokio::time::timeout(Duration::from_secs(30), tracker.wait()).await {
    Ok(()) => tracing::info!("all tasks completed"),
    Err(_) => tracing::warn!("shutdown timed out, {} tasks still running",
                              tracker.len()),
}

Tracing 集成与 tokio-console

基础 tracing 设置

# Cargo.toml
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use tracing_subscriber::{fmt, EnvFilter};

fn init_tracing() {
    tracing_subscriber::fmt()
        .with_env_filter(
            EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| EnvFilter::new("info,my_app=debug"))
        )
        .with_target(true)
        .with_thread_ids(true)
        .with_file(true)
        .with_line_number(true)
        .init();
}

tokio-console —— 异步任务的 htop

tokio-console 提供实时 TUI 界面,显示每个 task 的:

设置步骤

# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
// main.rs —— 替代标准 tracing 初始化
fn main() {
    console_subscriber::init();  // 自动设置 tracing subscriber + gRPC 服务
    // ...
}
# 编译时必须启用 tokio_unstable cfg
RUSTFLAGS="--cfg tokio_unstable" cargo build

# 在另一个终端启动 console
cargo install tokio-console
tokio-console  # 默认连接 http://127.0.0.1:6669

生产环境注意:tokio-console 有性能开销(tracing instrumentation),建议只在开发和 staging 环境启用。生产环境使用标准 tracing subscriber + 日志聚合。

异步代码的错误处理

模式 1:Result 传播(推荐)

use anyhow::Result;  // 或 thiserror 自定义错误类型

async fn fetch_and_process(url: &str) -> Result<ProcessedData> {
    let response = reqwest::get(url).await?;             // ? 传播网络错误
    let body = response.text().await?;                    // ? 传播读取错误
    let data: RawData = serde_json::from_str(&body)?;     // ? 传播解析错误
    let processed = process(data).await?;
    Ok(processed)
}

模式 2:JoinHandle 的双层错误

let handle = tokio::spawn(async {
    might_fail().await  // 返回 Result<T, MyError>
});

match handle.await {
    Ok(Ok(value)) => { /* 任务成功完成 */ }
    Ok(Err(app_err)) => { /* 任务内部返回错误 */ }
    Err(join_err) => {
        // 任务 panic 或被 abort
        if join_err.is_panic() {
            // 不要 unwrap panic payload,log 然后继续
            tracing::error!("task panicked: {:?}", join_err);
        } else {
            tracing::info!("task was cancelled");
        }
    }
}

模式 3:panic 处理策略

// 全局 panic hook —— 在 panic 时记录堆栈
std::panic::set_hook(Box::new(|info| {
    tracing::error!("panic occurred: {}", info);
}));

// spawn 时捕获 panic
let handle = tokio::spawn(async {
    do_work().await
});

if let Err(e) = handle.await {
    if e.is_panic() {
        // 决定策略:重启任务 / 关闭服务 / 忽略
        tracing::error!("critical task panicked, initiating shutdown");
        std::process::exit(1);
    }
}

生产 Checklist


分享这篇文章:

上一篇
微服务(二):完整微服务集群的分层系统
下一篇
微服务(一):拆分-按运维特征和组织边界画线