跳转到正文
zeno's blog
返回

tokio(二):任务系统-spawn、取消与协作式调度

专题: tokio

Table of contents

Open Table of contents

TL;DR

tokio::spawn 将一个 Future 包装为运行时调度的任务(task),任务是非抢占式的——它只在 .await 点主动让出执行权。tokio 通过 coop budget(128 次操作预算)实现软抢占,通过 JoinSet / TaskTracker 提供结构化并发,通过 drop JoinHandle 实现取消。理解任务生命周期是避免 tokio 大部分 pitfall 的关键。


tokio::spawn —— 内部发生了什么

签名与约束

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,

三个约束的含义:

约束原因对比 Asio
Future必须是可 poll 的状态机Asio 的 completion handler 也是可调用对象
Send任务可能被工作窃取到其他线程执行Asio 的 strand 在运行时保证序列化,不需要编译期 Send
'static任务的生命周期独立于 spawner,不能引用栈上数据Asio 的 handler 通过 shared_ptr 延长生命周期

spawn 的内部流程

tokio::spawn(my_future)


1. 将 Future 包装为 Task(alloc::Box<Task<F>>)
   Task 包含:Future 状态机 + 调度元数据 + Waker 引用


2. 获取当前 worker context

    ├─ 在 worker 线程上调用 ──→ 放入当前 worker 的 LIFO slot
    │                           (若 LIFO slot 已满,旧任务挤到 local queue)

    └─ 在非 worker 线程调用 ──→ 放入 global (inject) queue


3. 返回 JoinHandle<F::Output>
   (JoinHandle 可以 .await 获取结果,也可以 drop 或 .abort())

关键保证spawn 不会同步 poll 被 spawn 的任务。这意味着在持有锁时调用 spawn 不会与被 spawn 的任务形成死锁。

不保证完成:spawned task 不保证执行完成。runtime 关闭时,所有未完成任务会被 drop。

不使用 spawn 的替代方案

并非所有并发都需要 spawn。对比:

// 方式 1: spawn —— 独立任务,可被调度到任意 worker
let handle = tokio::spawn(async { do_work().await });
let result = handle.await?;

// 方式 2: join! —— 同一任务内并发,无需 Send + 'static
let (a, b) = tokio::join!(
    fetch_from_db(),    // 在同一个 task 内交替 poll
    fetch_from_api()
);

// 方式 3: select! —— 竞争,第一个完成的获胜
tokio::select! {
    result = fetch_data() => handle_data(result),
    _ = tokio::time::sleep(Duration::from_secs(5)) => handle_timeout(),
}
方式需要 Send需要 ‘static跨线程取消语义
spawndrop JoinHandle 或 abort
join!无独立取消
select!未完成分支被 drop

JoinHandle 与任务取消

JoinHandle

let handle: JoinHandle<String> = tokio::spawn(async {
    "hello".to_string()
});

// .await 获取结果
let result: Result<String, JoinError> = handle.await;
// JoinError 在任务 panic 或被 abort 时返回

取消语义:drop = cancel

在 Rust 中,drop 一个 Future 就是取消它。 这是与几乎所有其他语言最大的区别。

let handle = tokio::spawn(async {
    println!("start");
    some_io().await;          // <-- 如果在这个 .await 点被取消
    println!("after io");     // <-- 这行永远不会执行
    cleanup();                // <-- 这行也不会执行
});

// 方式 1: drop handle —— 任务继续在后台运行!
drop(handle);  // 注意:drop JoinHandle 不会取消任务

// 方式 2: abort —— 主动取消任务
handle.abort();  // 下次任务被 poll 时,会在 .await 点返回 Cancelled

// 方式 3: JoinSet::abort_all —— 批量取消

重要区别

取消发生在 .await 点——任务不会在执行同步代码的过程中被强制终止。这意味着 .await 点之间的代码是「取消安全的原子单元」。


协作式调度:coop budget

问题

async 任务是非抢占式的。如果一个任务在被 poll 时做了大量工作而不让出 CPU,其他任务会被饿死:

// 坏:这个任务可能在一次 poll 中处理数万个 channel 消息
loop {
    let msg = receiver.recv().await;  // 如果 channel 中有 50000 条消息
    process(msg);                      // 每次 recv 立即返回 Ready,不会让出
}

解决方案:coop 模块

tokio 0.2.14 引入了协作式任务让出(cooperative task yielding)机制,位于 tokio::task::coop(内部模块,非公开 API)。

工作原理

  1. 每个任务被调度执行时,获得 128 次操作预算
  2. 每次 .await 一个 tokio 资源(socket read/write、channel recv/send、timer tick 等)消耗 1 次预算
  3. 预算耗尽后,所有 tokio 资源的 poll 方法返回 Poll::Pending,即使资源实际已就绪
  4. 任务因 Pending 让出执行权,调度器切换到下一个任务
  5. 任务下次被调度时,预算重置为 128
Task 被调度 ──→ budget = 128

    ├── socket.read().await ──→ budget = 127, Ready(data)
    ├── socket.read().await ──→ budget = 126, Ready(data)
    ├── ... (重复 126 次)
    ├── socket.read().await ──→ budget = 0
    │                           即使 socket 有数据,也返回 Pending
    │                           任务让出

调度器切换到其他任务

128 这个数字 是经验值——“picked mostly because it felt good and seemed to work well”。太小会导致不必要的上下文切换,太大会导致尾部延迟增加。

效果:tokio 官方博客报告,某些场景下尾部延迟降低了约 3x。

coop 的局限

coop 只在任务 .await tokio 自身的资源时生效。以下场景 不受 coop 保护


结构化并发:JoinSet 与 TaskTracker

JoinSet —— 任务集合管理

use tokio::task::JoinSet;

let mut set = JoinSet::new();

for i in 0..10 {
    set.spawn(async move {
        tokio::time::sleep(Duration::from_secs(i)).await;
        i * 2
    });
}

// 按完成顺序获取结果(不是 spawn 顺序)
while let Some(result) = set.join_next().await {
    println!("got: {:?}", result?);
}

JoinSet 的关键行为:

行为说明
drop(JoinSet)立即 abort 所有任务
join_next().await等待任意一个任务完成,返回结果
abort_all()取消所有任务
内存管理已完成但未被 join_next 消费的任务结果会堆积在内存中

内存陷阱:如果持续 spawn 但从不调用 join_next,已完成任务的返回值会无限堆积,最终 OOM。

TaskTracker —— 轻量级任务追踪

tokio_util::task::TaskTracker(注意:在 tokio-util crate 中,不在 tokio 本体):

use tokio_util::task::TaskTracker;
use tokio_util::sync::CancellationToken;

let tracker = TaskTracker::new();
let token = CancellationToken::new();

for i in 0..10 {
    let token = token.clone();
    tracker.spawn(async move {
        tokio::select! {
            _ = token.cancelled() => { println!("task {} cancelled", i); }
            _ = do_work(i) => { println!("task {} done", i); }
        }
    });
}

// 触发取消
token.cancel();

// 关闭 tracker(不再接受新任务)并等待所有任务完成
tracker.close();
tracker.wait().await;  // 阻塞直到所有任务退出

JoinSet vs TaskTracker:

维度JoinSetTaskTracker
所在 cratetokiotokio-util
收集返回值是(join_next
drop 行为abort 所有任务不 abort 任务
内存风险不消费结果会 OOM无(不存储结果)
主要用途需要结果的并发任务集合优雅关闭时等待所有后台任务

spawn_blocking —— 阻塞任务专用线程池

use tokio::task;

// 在专用的阻塞线程池中执行同步代码
let result = task::spawn_blocking(|| {
    // 这里可以做 CPU 密集或调用阻塞 API
    std::fs::read_to_string("/etc/hosts").unwrap()
}).await?;

spawn_blocking 使用一个独立的线程池(默认上限 512 线程),与 worker 线程完全隔离。当线程池满时,新的 spawn_blocking 调用会阻塞直到有线程可用。

何时使用

场景使用 spawn使用 spawn_blocking
网络 I/O
文件 I/O(无 async API)
CPU 密集计算(>10μs)
调用 C FFI 阻塞函数
使用 std::sync::Mutex短临界区 ✅长临界区 ✅
数据库查询(同步驱动)

task::yield_now()

tokio::task::yield_now().await;

主动让出当前任务的执行权,将自身重新放入待处理队列的 尾部。与 Go 的 runtime.Gosched() 类似。

用途

非保证


分享这篇文章:

上一篇
tokio(三):I/O、定时器、同步原语与 select!
下一篇
tokio(一):运行时-从 mio 到异步运行时的完整设计