Table of contents
Open Table of contents
TL;DR
tokio::spawn 将一个 Future 包装为运行时调度的任务(task),任务是非抢占式的——它只在 .await 点主动让出执行权。tokio 通过 coop budget(128 次操作预算)实现软抢占,通过 JoinSet / TaskTracker 提供结构化并发,通过 drop JoinHandle 实现取消。理解任务生命周期是避免 tokio 大部分 pitfall 的关键。
tokio::spawn —— 内部发生了什么
签名与约束
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
三个约束的含义:
| 约束 | 原因 | 对比 Asio |
|---|---|---|
Future | 必须是可 poll 的状态机 | Asio 的 completion handler 也是可调用对象 |
Send | 任务可能被工作窃取到其他线程执行 | Asio 的 strand 在运行时保证序列化,不需要编译期 Send |
'static | 任务的生命周期独立于 spawner,不能引用栈上数据 | Asio 的 handler 通过 shared_ptr 延长生命周期 |
spawn 的内部流程
tokio::spawn(my_future)
│
▼
1. 将 Future 包装为 Task(alloc::Box<Task<F>>)
Task 包含:Future 状态机 + 调度元数据 + Waker 引用
│
▼
2. 获取当前 worker context
│
├─ 在 worker 线程上调用 ──→ 放入当前 worker 的 LIFO slot
│ (若 LIFO slot 已满,旧任务挤到 local queue)
│
└─ 在非 worker 线程调用 ──→ 放入 global (inject) queue
│
▼
3. 返回 JoinHandle<F::Output>
(JoinHandle 可以 .await 获取结果,也可以 drop 或 .abort())
关键保证:spawn 不会同步 poll 被 spawn 的任务。这意味着在持有锁时调用 spawn 不会与被 spawn 的任务形成死锁。
不保证完成:spawned task 不保证执行完成。runtime 关闭时,所有未完成任务会被 drop。
不使用 spawn 的替代方案
并非所有并发都需要 spawn。对比:
// 方式 1: spawn —— 独立任务,可被调度到任意 worker
let handle = tokio::spawn(async { do_work().await });
let result = handle.await?;
// 方式 2: join! —— 同一任务内并发,无需 Send + 'static
let (a, b) = tokio::join!(
fetch_from_db(), // 在同一个 task 内交替 poll
fetch_from_api()
);
// 方式 3: select! —— 竞争,第一个完成的获胜
tokio::select! {
result = fetch_data() => handle_data(result),
_ = tokio::time::sleep(Duration::from_secs(5)) => handle_timeout(),
}
| 方式 | 需要 Send | 需要 ‘static | 跨线程 | 取消语义 |
|---|---|---|---|---|
spawn | 是 | 是 | 是 | drop JoinHandle 或 abort |
join! | 否 | 否 | 否 | 无独立取消 |
select! | 否 | 否 | 否 | 未完成分支被 drop |
JoinHandle 与任务取消
JoinHandle
let handle: JoinHandle<String> = tokio::spawn(async {
"hello".to_string()
});
// .await 获取结果
let result: Result<String, JoinError> = handle.await;
// JoinError 在任务 panic 或被 abort 时返回
取消语义:drop = cancel
在 Rust 中,drop 一个 Future 就是取消它。 这是与几乎所有其他语言最大的区别。
let handle = tokio::spawn(async {
println!("start");
some_io().await; // <-- 如果在这个 .await 点被取消
println!("after io"); // <-- 这行永远不会执行
cleanup(); // <-- 这行也不会执行
});
// 方式 1: drop handle —— 任务继续在后台运行!
drop(handle); // 注意:drop JoinHandle 不会取消任务
// 方式 2: abort —— 主动取消任务
handle.abort(); // 下次任务被 poll 时,会在 .await 点返回 Cancelled
// 方式 3: JoinSet::abort_all —— 批量取消
重要区别:
drop(JoinHandle)—— 不取消任务,任务继续运行,只是你拿不到结果了(即 “detach”)JoinHandle::abort()—— 取消任务,任务在下次.await点被 dropdrop(JoinSet)—— 取消 JoinSet 中所有任务
取消发生在 .await 点——任务不会在执行同步代码的过程中被强制终止。这意味着 .await 点之间的代码是「取消安全的原子单元」。
协作式调度:coop budget
问题
async 任务是非抢占式的。如果一个任务在被 poll 时做了大量工作而不让出 CPU,其他任务会被饿死:
// 坏:这个任务可能在一次 poll 中处理数万个 channel 消息
loop {
let msg = receiver.recv().await; // 如果 channel 中有 50000 条消息
process(msg); // 每次 recv 立即返回 Ready,不会让出
}
解决方案:coop 模块
tokio 0.2.14 引入了协作式任务让出(cooperative task yielding)机制,位于 tokio::task::coop(内部模块,非公开 API)。
工作原理:
- 每个任务被调度执行时,获得 128 次操作预算
- 每次
.await一个 tokio 资源(socket read/write、channel recv/send、timer tick 等)消耗 1 次预算 - 预算耗尽后,所有 tokio 资源的 poll 方法返回
Poll::Pending,即使资源实际已就绪 - 任务因
Pending让出执行权,调度器切换到下一个任务 - 任务下次被调度时,预算重置为 128
Task 被调度 ──→ budget = 128
│
├── socket.read().await ──→ budget = 127, Ready(data)
├── socket.read().await ──→ budget = 126, Ready(data)
├── ... (重复 126 次)
├── socket.read().await ──→ budget = 0
│ 即使 socket 有数据,也返回 Pending
│ 任务让出
▼
调度器切换到其他任务
128 这个数字 是经验值——“picked mostly because it felt good and seemed to work well”。太小会导致不必要的上下文切换,太大会导致尾部延迟增加。
效果:tokio 官方博客报告,某些场景下尾部延迟降低了约 3x。
coop 的局限
coop 只在任务 .await tokio 自身的资源时生效。以下场景 不受 coop 保护:
- CPU 密集计算(没有
.await点) - 调用阻塞 API(
std::fs::read、std::sync::Mutex::lock) .await第三方库的 Future(除非该库也集成了 coop)
结构化并发:JoinSet 与 TaskTracker
JoinSet —— 任务集合管理
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move {
tokio::time::sleep(Duration::from_secs(i)).await;
i * 2
});
}
// 按完成顺序获取结果(不是 spawn 顺序)
while let Some(result) = set.join_next().await {
println!("got: {:?}", result?);
}
JoinSet 的关键行为:
| 行为 | 说明 |
|---|---|
drop(JoinSet) | 立即 abort 所有任务 |
join_next().await | 等待任意一个任务完成,返回结果 |
abort_all() | 取消所有任务 |
| 内存管理 | 已完成但未被 join_next 消费的任务结果会堆积在内存中 |
内存陷阱:如果持续 spawn 但从不调用 join_next,已完成任务的返回值会无限堆积,最终 OOM。
TaskTracker —— 轻量级任务追踪
tokio_util::task::TaskTracker(注意:在 tokio-util crate 中,不在 tokio 本体):
use tokio_util::task::TaskTracker;
use tokio_util::sync::CancellationToken;
let tracker = TaskTracker::new();
let token = CancellationToken::new();
for i in 0..10 {
let token = token.clone();
tracker.spawn(async move {
tokio::select! {
_ = token.cancelled() => { println!("task {} cancelled", i); }
_ = do_work(i) => { println!("task {} done", i); }
}
});
}
// 触发取消
token.cancel();
// 关闭 tracker(不再接受新任务)并等待所有任务完成
tracker.close();
tracker.wait().await; // 阻塞直到所有任务退出
JoinSet vs TaskTracker:
| 维度 | JoinSet | TaskTracker |
|---|---|---|
| 所在 crate | tokio | tokio-util |
| 收集返回值 | 是(join_next) | 否 |
| drop 行为 | abort 所有任务 | 不 abort 任务 |
| 内存风险 | 不消费结果会 OOM | 无(不存储结果) |
| 主要用途 | 需要结果的并发任务集合 | 优雅关闭时等待所有后台任务 |
spawn_blocking —— 阻塞任务专用线程池
use tokio::task;
// 在专用的阻塞线程池中执行同步代码
let result = task::spawn_blocking(|| {
// 这里可以做 CPU 密集或调用阻塞 API
std::fs::read_to_string("/etc/hosts").unwrap()
}).await?;
spawn_blocking 使用一个独立的线程池(默认上限 512 线程),与 worker 线程完全隔离。当线程池满时,新的 spawn_blocking 调用会阻塞直到有线程可用。
何时使用:
| 场景 | 使用 spawn | 使用 spawn_blocking |
|---|---|---|
| 网络 I/O | ✅ | ❌ |
| 文件 I/O(无 async API) | ❌ | ✅ |
| CPU 密集计算(>10μs) | ❌ | ✅ |
| 调用 C FFI 阻塞函数 | ❌ | ✅ |
使用 std::sync::Mutex | 短临界区 ✅ | 长临界区 ✅ |
| 数据库查询(同步驱动) | ❌ | ✅ |
task::yield_now()
tokio::task::yield_now().await;
主动让出当前任务的执行权,将自身重新放入待处理队列的 尾部。与 Go 的 runtime.Gosched() 类似。
用途:
- 在长循环中手动插入让出点(coop budget 已覆盖大部分场景,一般不需要手动调用)
- 确保刚 spawn 的任务有机会执行
非保证:
- 不保证其他任务一定会在
yield_now返回前执行 - runtime 可能选择立即重新调度当前任务
- 如果
yield_now在select!的某个分支中,且另一个分支同时完成,yield 不会传播到调度器