Table of contents
Open Table of contents
TL;DR
tokio 最常见的陷阱分为两类:阻塞运行时(在 async 上下文中做同步操作)和取消安全(Future 在 .await 点被 drop 导致状态丢失)。本文列举 10 个高频陷阱和对应的生产级修复方案,以及 runtime 配置、优雅关闭、可观测性和错误处理的最佳实践。
陷阱(Pitfalls)
陷阱 1:在 async 任务中使用 std::sync::Mutex 跨 .await 持有锁
症状:性能退化、worker 线程被阻塞、极端情况下死锁。
// 错误 ❌
let data = Arc::new(std::sync::Mutex::new(HashMap::new()));
tokio::spawn({
let data = data.clone();
async move {
let mut guard = data.lock().unwrap(); // 阻塞 worker 线程
let value = fetch_from_db().await; // 持有锁跨 .await!
guard.insert("key", value); // 其他 worker 等锁时被阻塞
}
});
根因:std::sync::Mutex::lock() 是阻塞调用。如果锁被另一个任务持有,当前 worker 线程 被阻塞——不是任务被阻塞,是 整个线程。其他本该在这个线程上运行的任务全部停滞。
修复:
// 方案 A: 临界区内没有 .await —— 用 std Mutex,快速释放
let data = Arc::new(std::sync::Mutex::new(HashMap::new()));
tokio::spawn({
let data = data.clone();
async move {
let value = fetch_from_db().await; // 先完成 I/O
let mut guard = data.lock().unwrap();
guard.insert("key", value); // 快速操作,无 .await
// guard 在此 drop
}
});
// 方案 B: 必须跨 .await 持有锁 —— 用 tokio Mutex
let data = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
tokio::spawn({
let data = data.clone();
async move {
let mut guard = data.lock().await; // 不阻塞线程
let value = fetch_from_db().await;
guard.insert("key", value);
}
});
陷阱 2:CPU 密集计算阻塞 async 运行时
症状:所有 I/O 操作延迟飙升,定时器不准,任务饥饿。
// 错误 ❌
tokio::spawn(async {
// 在 worker 线程上做 CPU 密集计算
let result = (0..10_000_000).map(|x| x * x).sum::<u64>();
// 这段代码没有任何 .await 点,worker 线程完全被占用
});
根因:tokio 的 coop budget 只在 .await tokio 资源时递减。纯 CPU 计算没有 .await 点,coop 无法介入。
修复:
// 方案 A: spawn_blocking
let result = tokio::task::spawn_blocking(|| {
(0..10_000_000).map(|x| x * x).sum::<u64>()
}).await.unwrap();
// 方案 B: 用 rayon(专业并行计算库)
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
let result: u64 = (0..10_000_000u64).into_par_iter().map(|x| x * x).sum();
let _ = tx.send(result);
});
let result = rx.await.unwrap();
陷阱 3:任务取消在 .await 点丢弃状态
症状:数据写入不完整、资源泄漏、业务逻辑不一致。
// 危险 ❌
async fn transfer_money(from: &Account, to: &Account, amount: u64) {
from.debit(amount).await; // 步骤 1: 扣款
// <-- 如果在这里被取消(如 select! 中另一个分支先完成)
to.credit(amount).await; // 步骤 2: 入账 —— 可能永远不执行
}
// 在 select! 中使用
tokio::select! {
_ = transfer_money(&alice, &bob, 100) => {}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
println!("timeout");
// transfer_money 在 debit 之后、credit 之前被 drop
// 钱扣了但没到账!
}
}
修复:
// 方案 A: spawn 独立任务(不受 select! 取消影响)
let handle = tokio::spawn(transfer_money(alice.clone(), bob.clone(), 100));
tokio::select! {
result = handle => { /* 完成 */ }
_ = tokio::time::sleep(Duration::from_secs(5)) => {
// handle 被 drop,但 task 继续运行直到完成
println!("timeout, but transfer continues in background");
}
}
// 方案 B: 使用数据库事务确保原子性(根本解决方案)
async fn transfer_money(db: &Pool, from_id: i64, to_id: i64, amount: u64) {
let mut tx = db.begin().await.unwrap();
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(amount as i64).bind(from_id).execute(&mut *tx).await.unwrap();
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(amount as i64).bind(to_id).execute(&mut *tx).await.unwrap();
tx.commit().await.unwrap();
// 即使被取消,未 commit 的事务会被数据库回滚
}
陷阱 4:select! 循环中丢失 Future 状态
症状:定时器永远不触发、消息被跳过。
// 错误 ❌ —— sleep 每次循环都重新创建
loop {
tokio::select! {
msg = rx.recv() => { process(msg).await; }
_ = tokio::time::sleep(Duration::from_secs(30)) => {
// 永远不会触发:每次 msg 到达时 sleep 被 drop 并重建
break;
}
}
}
// 正确 ✅ —— pin sleep,跨迭代复用
let sleep = tokio::time::sleep(Duration::from_secs(30));
tokio::pin!(sleep);
loop {
tokio::select! {
msg = rx.recv() => {
process(msg).await;
// 可选:收到消息时重置超时
// sleep.as_mut().reset(Instant::now() + Duration::from_secs(30));
}
_ = &mut sleep => {
println!("30s idle, shutting down");
break;
}
}
}
陷阱 5:spawn 的任务引用非 ‘static 数据
症状:编译错误(Rust 编译器阻止这个错误,不会变成运行时 bug)。
// 编译错误 ❌
async fn process(data: &[u8]) {
tokio::spawn(async {
println!("{:?}", data); // `data` 的生命周期不是 'static
// error: borrowed data escapes outside of function
});
}
// 修复 ✅ —— 将数据 move 进 task
async fn process(data: &[u8]) {
let owned = data.to_vec(); // 拷贝为 owned 数据
tokio::spawn(async move {
println!("{:?}", owned);
});
}
为什么有 'static 要求:spawned task 可能比 spawner 活得更久。如果 task 持有 spawner 栈上数据的引用,spawner 返回后引用会悬垂。Rust 在编译期阻止了这个问题(C++ Asio 中这是运行时 bug)。
陷阱 6:忘记 .await 一个 Future
症状:编译器 warning(Rust 1.x),代码看起来应该执行但实际什么都没发生。
// 错误 ❌
async fn handler() {
tokio::time::sleep(Duration::from_secs(1)); // 缺少 .await!
// 创建了 Sleep future 但立即 drop,不会等待 1 秒
println!("this prints immediately");
}
// 正确 ✅
async fn handler() {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("this prints after 1 second");
}
根因:Rust Future 是惰性的——创建 Future 不执行任何操作。编译器会发出 #[must_use] warning,但容易在嘈杂的编译输出中被忽略。
陷阱 7:嵌套 Runtime::block_on
症状:panic(“Cannot start a runtime from within a runtime”)。
// 错误 ❌
#[tokio::main]
async fn main() {
// 已经在 tokio runtime 内
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { // panic!
// ...
});
}
// 修复 ✅ —— 如果需要在 sync 代码中调用 async
fn sync_function() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// 这是合法的,因为 sync_function 不在 runtime 内
});
}
// 在已有 runtime 中:直接 .await 或 spawn
#[tokio::main]
async fn main() {
some_async_fn().await; // 直接 await
tokio::spawn(some_other_fn()); // 或 spawn
}
陷阱 8:mpsc channel 容量不当导致反压失控
症状:内存无限增长(无界 channel)或生产者被意外阻塞(有界 channel 容量太小)。
// 危险 ❌ —— 无界 channel,生产者永远不会被阻塞
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
// 如果 consumer 处理不过来,消息无限堆积 → OOM
// 正确 ✅ —— 有界 channel + 处理 send 失败
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1024);
if tx.send(event).await.is_err() {
// receiver 已 drop,处理错误
tracing::error!("receiver dropped, event lost");
}
// 或使用 try_send 非阻塞发送
match tx.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
// channel 满,决定:丢弃 / 等待 / 降级
tracing::warn!("channel full, dropping event");
}
Err(TrySendError::Closed(event)) => {
tracing::error!("receiver gone");
}
}
陷阱 9:任务粒度不当
太多小任务:每个任务都有分配开销(~300-500 bytes)、调度开销。百万级微任务会导致调度器压力过大。
// 错误 ❌ —— 为每个字节处理 spawn 一个任务
for byte in data.iter() {
tokio::spawn(async move { process_byte(*byte).await });
}
// 正确 ✅ —— 批量处理
for chunk in data.chunks(1024) {
let chunk = chunk.to_vec();
tokio::spawn(async move {
for byte in chunk { process_byte(byte).await; }
});
}
太少大任务:一个任务做太多事,无法被并行处理,浪费多核。
// 错误 ❌ —— 一个任务串行处理所有连接
async fn handle_all(listener: TcpListener) {
loop {
let (socket, _) = listener.accept().await.unwrap();
process(socket).await; // 串行!一个连接处理完才处理下一个
}
}
// 正确 ✅ —— 每个连接一个任务
async fn handle_all(listener: TcpListener) {
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(process(socket)); // 并发处理
}
}
陷阱 10:在 async 代码中调用阻塞 I/O
症状:与陷阱 2 相同——worker 线程被阻塞。
// 错误 ❌
tokio::spawn(async {
let content = std::fs::read_to_string("large_file.txt").unwrap(); // 阻塞!
process(content).await;
});
// 正确 ✅
tokio::spawn(async {
let content = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("large_file.txt").unwrap()
}).await.unwrap();
process(content).await;
});
// 或使用 tokio::fs(内部也是 spawn_blocking,但 API 更 ergonomic)
tokio::spawn(async {
let content = tokio::fs::read_to_string("large_file.txt").await.unwrap();
process(content).await;
});
注意:
tokio::fs内部就是对spawn_blocking+std::fs的封装。目前 tokio 没有基于io_uring的真正异步文件 I/O(尽管 tokio 1.0 路线图提到了这一计划,tokio-uring作为独立 crate 存在但不在 tokio 主 crate 中)。
生产最佳实践
Runtime 配置指南
| 工作负载类型 | 推荐配置 | 原因 |
|---|---|---|
| I/O 密集型(Web 服务器、代理) | worker_threads = CPU 核心数(默认值) | worker 大部分时间在等 I/O |
| CPU + I/O 混合 | worker_threads = CPU 核心数,配合 spawn_blocking / rayon | 分离 CPU 和 I/O 工作 |
| 高连接数(10K+) | 默认 worker + 增大 max_blocking_threads | 阻塞线程池可能成为瓶颈 |
| 嵌入式 / 资源受限 | current_thread | 最小开销 |
| 每核一个 runtime(thread-per-core) | 多个 current_thread runtime,每个绑定到一个 CPU 核心 | 避免跨核通信,最大化缓存局部性 |
// 生产级 runtime 配置示例
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get()) // 或从配置读取
.max_blocking_threads(512) // 根据阻塞操作量调整
.thread_name_fn(|| { // 线程名包含编号,方便调试
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("my-app-worker-{}", id)
})
.thread_stack_size(4 * 1024 * 1024) // 4MB,深递归场景可调大
.enable_all()
.build()
.expect("failed to build tokio runtime");
优雅关闭模式
推荐的三阶段关闭:
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let tracker = TaskTracker::new();
// 阶段 1: 启动工作任务
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
loop {
tokio::select! {
// 接受新连接
Ok((socket, _)) = listener.accept() => {
let token = token.clone();
tracker.spawn(async move {
tokio::select! {
_ = handle_connection(socket) => {}
_ = token.cancelled() => {
// 可在此执行清理:flush buffer、发 FIN 等
tracing::info!("connection handler shutting down");
}
}
});
}
// 阶段 2: 收到关闭信号
_ = signal::ctrl_c() => {
tracing::info!("shutdown signal received");
break;
}
}
}
// 阶段 3: 通知所有任务停止,等待它们完成
token.cancel(); // 通知所有任务
tracker.close(); // 不再接受新任务
tracker.wait().await; // 等待所有任务退出
tracing::info!("graceful shutdown complete");
}
关闭超时:防止某些任务永远不退出:
// 带超时的等待
match tokio::time::timeout(Duration::from_secs(30), tracker.wait()).await {
Ok(()) => tracing::info!("all tasks completed"),
Err(_) => tracing::warn!("shutdown timed out, {} tasks still running",
tracker.len()),
}
Tracing 集成与 tokio-console
基础 tracing 设置
# Cargo.toml
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use tracing_subscriber::{fmt, EnvFilter};
fn init_tracing() {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,my_app=debug"))
)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.init();
}
tokio-console —— 异步任务的 htop
tokio-console 提供实时 TUI 界面,显示每个 task 的:
- 忙碌时间(busy time)
- 调度等待时间(scheduled time)
- 空闲时间(idle time)
- poll 次数
- Waker 调用次数
设置步骤:
# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
// main.rs —— 替代标准 tracing 初始化
fn main() {
console_subscriber::init(); // 自动设置 tracing subscriber + gRPC 服务
// ...
}
# 编译时必须启用 tokio_unstable cfg
RUSTFLAGS="--cfg tokio_unstable" cargo build
# 在另一个终端启动 console
cargo install tokio-console
tokio-console # 默认连接 http://127.0.0.1:6669
生产环境注意:tokio-console 有性能开销(tracing instrumentation),建议只在开发和 staging 环境启用。生产环境使用标准 tracing subscriber + 日志聚合。
异步代码的错误处理
模式 1:Result 传播(推荐)
use anyhow::Result; // 或 thiserror 自定义错误类型
async fn fetch_and_process(url: &str) -> Result<ProcessedData> {
let response = reqwest::get(url).await?; // ? 传播网络错误
let body = response.text().await?; // ? 传播读取错误
let data: RawData = serde_json::from_str(&body)?; // ? 传播解析错误
let processed = process(data).await?;
Ok(processed)
}
模式 2:JoinHandle 的双层错误
let handle = tokio::spawn(async {
might_fail().await // 返回 Result<T, MyError>
});
match handle.await {
Ok(Ok(value)) => { /* 任务成功完成 */ }
Ok(Err(app_err)) => { /* 任务内部返回错误 */ }
Err(join_err) => {
// 任务 panic 或被 abort
if join_err.is_panic() {
// 不要 unwrap panic payload,log 然后继续
tracing::error!("task panicked: {:?}", join_err);
} else {
tracing::info!("task was cancelled");
}
}
}
模式 3:panic 处理策略
// 全局 panic hook —— 在 panic 时记录堆栈
std::panic::set_hook(Box::new(|info| {
tracing::error!("panic occurred: {}", info);
}));
// spawn 时捕获 panic
let handle = tokio::spawn(async {
do_work().await
});
if let Err(e) = handle.await {
if e.is_panic() {
// 决定策略:重启任务 / 关闭服务 / 忽略
tracing::error!("critical task panicked, initiating shutdown");
std::process::exit(1);
}
}
生产 Checklist
- 所有阻塞操作(文件 I/O、CPU 密集、同步 FFI)都在
spawn_blocking或 rayon 中 -
std::sync::Mutex的临界区内没有.await -
select!循环中的长期 Future 用pin!固定 - 所有
mpscchannel 使用有界容量,有背压处理策略 - 使用
CancellationToken+TaskTracker实现优雅关闭 - 优雅关闭有超时兜底(防止任务永不退出)
- tracing 正确配置,关键路径有 span/event
- runtime 的
worker_threads和max_blocking_threads根据负载调整 - 没有嵌套
Runtime::block_on - 每个
tokio::spawn的JoinHandle要么被.await,要么明确知道 detach 的后果 - CI 中启用
cargo clippy的 async 相关 lint -
select!中使用的 async 方法都标注了取消安全性