跳转到正文
zeno's blog
返回

tokio(一):运行时-从 mio 到异步运行时的完整设计

专题: tokio

Table of contents

Open Table of contents

TL;DR

tokio 是 Rust 的异步运行时,提供工作窃取调度器、I/O 驱动、定时器、同步原语和任务管理。它在 Rust 异步生态中的角色等同于 Asio 的 io_context + strand + timer_queue + 线程池的总和,但 Rust 选择了分层架构(mio → Future trait → tokio)而非 Asio 的单库大一统模式。理解 tokio 需要先理解它为什么不能只用 mio,以及 Rust async/await 的设计如何塑造了运行时的形态。


为什么 tokio 存在:mio 不够

mio 的定位

mio(Metal I/O)是 Rust 的跨平台 I/O 事件通知库,直接封装 Linux epoll / macOS kqueue / Windows IOCP。mio 是一个 极薄的抽象层,它的 API 几乎就是 epoll_wait 的跨平台翻译:

// mio 的核心 API —— 这就是全部
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1024);
poll.registry().register(&mut socket, Token(0), Interest::READABLE)?;

loop {
    poll.poll(&mut events, Some(Duration::from_millis(100)))?;
    for event in events.iter() {
        // 手动处理每个事件,手动管理状态机
    }
}

mio 不提供

能力miotokio
任务调度器工作窃取多线程调度
Future / async 集成原生 async fn + spawn
定时器分层时间轮
同步原语Mutex / channel / Semaphore
缓冲 I/OAsyncBufRead / BufReader
协作式抢占coop budget 机制

mio 相当于 Asio 中 epoll_reactor 那一层——只管「哪个 fd 就绪了」,不管「怎么调度处理它的代码」。用纯 mio 写服务器,你必须自己手写状态机、管理回调、实现定时器、安排线程——这正是 tokio 解决的问题。

与 C++ Asio 的架构对比

Asio 是 单库大一统架构

C++ Asio(一个库包含一切)
├── io_context        (事件循环 + 调度)
├── strand            (并发序列化)
├── steady_timer      (定时器)
├── ip::tcp::socket   (网络 I/O)
├── post/dispatch     (任务投递)
└── thread_pool       (多线程执行器)

Rust 异步生态是 分层解耦架构

Rust 异步栈(每层可独立替换)
├── 语言层: async/await 语法 + std::future::Future trait
├── 运行时层: tokio(或 async-std / smol / glommio)
│   ├── 调度器 (work-stealing / current-thread)
│   ├── I/O 驱动 (封装 mio)
│   ├── 定时器 (分层时间轮)
│   └── 同步原语 (channel / Mutex / ...)
├── 平台层: mio (epoll / kqueue / IOCP 抽象)
└── 操作系统: epoll / kqueue / IOCP

这种分层带来的关键差异:

维度C++ AsioRust tokio 生态
运行时可替换否,Asio 是唯一选择是,tokio / async-std / smol 可替换
Future 定义Asio 自定义 completion tokenstd::future::Future 语言标准 trait
零成本抽象模板元编程实现编译器原生支持 async 状态机
线程安全运行时检查(strand)编译期强制(Send + 'static
取消语义手动 cancel()drop Future 即取消

历史:从 tokio-core 到 tokio 1.0

tokio 的演进与 Rust async 生态的成熟同步:

2016-08:tokio-core 0.1 Carl Lerche 发布 tokio-core,基于 futures 0.1 crate 的自定义 Future trait(非标准库)。灵感来源于 Twitter 的 Finagle(Scala RPC 框架)。此时写异步代码需要手动 .and_then().map() 链式组合,极其痛苦:

// tokio-core 时代的异步代码(2016-2018)—— 回调地狱
let server = listener.incoming().for_each(move |socket| {
    let connection = io::read_to_end(socket, Vec::new())
        .and_then(|(socket, data)| {
            io::write_all(socket, data)
        })
        .then(|_| Ok(()));
    tokio::spawn(connection);
    Ok(())
});

2018-08:tokio 0.1 从 tokio-core 重组为 tokio crate,但仍基于 futures 0.1 的自定义 Future trait。

2019-11:tokio 0.2 —— 转向 std::future::Future Rust 1.39(2019-11)稳定了 async/await 语法和 std::future::Future trait。tokio 0.2 是一次 彻底重写

2020-12:tokio 1.0 —— 稳定性承诺

至今(2026):tokio 1.x 持续演进,保持 1.x 兼容。新增 JoinSetTaskTracker、cooperative scheduling 等特性。

async/await 与 Future trait 的关系

Rust 的 async/await编译器语法糖,将 async fn 编译为实现 Future trait 的状态机:

// 你写的
async fn fetch_data(url: &str) -> Result<Vec<u8>> {
    let conn = connect(url).await?;  // 挂起点 1
    let data = conn.read().await?;   // 挂起点 2
    Ok(data)
}

// 编译器生成的(概念性)
enum FetchDataFuture {
    State0 { url: String },               // 初始状态
    State1 { conn_future: ConnectFuture }, // 等待连接
    State2 { read_future: ReadFuture },    // 等待读取
    Done,
}

impl Future for FetchDataFuture {
    type Output = Result<Vec<u8>>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 根据当前状态推进状态机
    }
}

Future::poll 的核心语义:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),   // 完成,返回结果
    Pending,    // 未完成,已注册 Waker,等通知后再 poll
}

关键设计:惰性求值(lazy evaluation)。Rust 的 Future 在被 poll 之前不会执行任何代码——创建一个 Future 只是构造状态机,不启动任何操作。这与 JavaScript 的 Promise(创建即执行)和 C++ Asio 的 async_read(调用即提交到 io_context)不同。

tokio 的角色:作为运行时,tokio 负责反复调用 Future::poll——当 I/O 就绪或定时器触发时,通过 Waker 机制通知调度器重新 poll 对应的 Future。没有运行时,async fn 只是一个不会自己跑的状态机。


运行时架构

多线程运行时:工作窃取调度器

tokio 的默认运行时使用工作窃取(work-stealing)调度器,设计借鉴了 Go 的 goroutine 调度器。

核心数据结构

┌─────────────────────────────────────────────────────┐
│                   tokio Runtime                      │
│                                                      │
│  ┌──────────┐  ┌──────────┐       ┌──────────┐     │
│  │ Worker 0 │  │ Worker 1 │  ...  │ Worker N │     │
│  │          │  │          │       │          │     │
│  │ LIFO slot│  │ LIFO slot│       │ LIFO slot│     │
│  │ [1 task] │  │ [1 task] │       │ [1 task] │     │
│  │          │  │          │       │          │     │
│  │ Local Q  │  │ Local Q  │       │ Local Q  │     │
│  │ (256)    │  │ (256)    │       │ (256)    │     │
│  └──────────┘  └──────────┘       └──────────┘     │
│                                                      │
│  ┌────────────────────────────────────────────┐     │
│  │          Global (Inject) Queue              │     │
│  │          (无界,Mutex 保护)                  │     │
│  └────────────────────────────────────────────┘     │
│                                                      │
│  ┌────────────────────────────────────────────┐     │
│  │          I/O Driver (mio::Poll)             │     │
│  └────────────────────────────────────────────┘     │
│                                                      │
│  ┌────────────────────────────────────────────┐     │
│  │          Timer Driver (时间轮)               │     │
│  └────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────┘

每个 Worker 线程拥有

全局共享

调度循环:Worker 线程的任务选取顺序

每个 worker 线程按以下优先级获取任务:

1. LIFO slot(最高优先级,缓存友好)
       ↓ 空
2. Local queue(本地 FIFO 队列)
       ↓ 空
3. Global queue(每 61 tick 强制检查一次)
       ↓ 空
4. Steal from another worker(随机选择目标,窃取一半任务)
       ↓ 全部为空
5. Park(挂起线程,等待 I/O 事件或新任务通知)

LIFO slot 优化

LIFO slot 是 tokio 调度器的核心优化。当一个任务唤醒另一个任务时:

  1. 被唤醒的任务放入当前 worker 的 LIFO slot
  2. 如果 LIFO slot 已有任务,旧任务被挤到 local queue 尾部
  3. Worker 下次取任务时 优先从 LIFO slot 取

为什么有效:在典型的请求-响应模式中,任务 A 发起 I/O → I/O 完成唤醒任务 A → A 被放入 LIFO slot → 立即被同一 worker 执行。任务 A 的栈数据大概率还在 CPU L1/L2 cache 中,减少缓存未命中。

防饥饿保护:如果 LIFO slot 连续被使用 3 次,临时禁用 LIFO slot,直到 worker 调度了一个非 LIFO 来源的任务。此外,LIFO slot 中的任务 不能被其他 worker 窃取,防止跨核缓存污染。

全局队列公平性:61 tick 检查

Worker 线程维护一个 tick 计数器。每执行 GLOBAL_POLL_INTERVAL = 61 个任务后,强制优先从全局队列取任务。这个值(61,质数)借鉴自 Go runtime 的设计,确保全局队列中的任务不会被本地队列饿死。

同时,每 61 tick 还会执行一次零超时的 mio::Poll::poll(),检查 I/O 事件,防止 I/O 饥饿。

工作窃取(Work Stealing)

当 worker 的本地队列和 LIFO slot 均为空时:

  1. 随机选择一个目标 worker
  2. 从目标的 local queue 中窃取 一半 的任务到自己的 local queue
  3. LIFO slot 中的任务不会被窃取
  4. 如果随机选中的目标也为空,继续尝试下一个(遍历所有 worker)

与 Go 调度器和 Asio 线程池的对比

维度tokio (work-stealing)Go (GMP 模型)Asio (thread pool)
调度单元tokio::task (~Future)goroutinehandler (回调/协程)
队列结构LIFO slot + local(256) + globallocal(256) + global单一全局队列 或 strand 序列化
窃取策略随机选目标,偷一半随机选目标,偷一半无窃取(或 strand 内序列化)
全局队列检查每 61 tick每 61 tick无(始终竞争全局队列)
LIFO 优化LIFO slot (1 task)runnext slot (1 goroutine)
抢占机制coop budget (128 ops)基于信号的抢占(sysmon)无自动抢占
阻塞处理spawn_blocking 专用线程池自动扩展 M(系统线程)post() 到线程池

单线程运行时(current_thread)

let rt = tokio::runtime::Builder::new_current_thread()
    .enable_all()
    .build()
    .unwrap();

所有任务在调用 block_on 的线程上执行,没有工作窃取。适用于:

Runtime 创建方式

// 方式 1: #[tokio::main] 宏 —— 最常用
#[tokio::main]  // 展开为 multi_thread runtime + block_on
async fn main() {
    // ...
}

// 等价于:
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            // ...
        })
}

// 方式 2: 指定单线程
#[tokio::main(flavor = "current_thread")]
async fn main() { /* ... */ }

// 方式 3: 指定 worker 数量
#[tokio::main(worker_threads = 4)]
async fn main() { /* ... */ }

// 方式 4: #[tokio::test] —— 默认单线程
#[tokio::test]
async fn my_test() { /* ... */ }

// 方式 5: #[tokio::test] 多线程
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn my_test() { /* ... */ }

// 方式 6: 手动 Builder —— 完全控制
let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(4)
    .max_blocking_threads(512)    // 默认 512
    .thread_name("my-worker")
    .thread_stack_size(3 * 1024 * 1024)  // 3 MB
    .enable_all()                 // 启用 I/O 和 timer driver
    .on_thread_start(|| { /* 线程启动时 */ })
    .on_thread_stop(|| { /* 线程停止时 */ })
    .on_thread_park(|| { /* 线程挂起时 */ })
    .on_thread_unpark(|| { /* 线程唤醒时 */ })
    .build()
    .unwrap();

Builder 关键配置项

配置项默认值说明
worker_threadsCPU 核心数worker 线程数。也可通过环境变量 TOKIO_WORKER_THREADS 设置
max_blocking_threads512spawn_blocking 线程池上限
thread_name"tokio-worker"worker 线程名。也可通过 TOKIO_THREAD_NAME 设置
thread_stack_size系统默认(通常 8 MB)worker 线程栈大小
enable_io()需手动启用启用 I/O driver (mio)
enable_time()需手动启用启用 timer driver
enable_all()同时启用 I/O 和 timer
global_queue_interval61检查全局队列的 tick 间隔(需验证:此 API 可能为 unstable)

注意enable_all() 是最常用的配置。如果忘记启用 I/O 或 timer,调用 TcpStream::connecttokio::time::sleep 会 panic。


分享这篇文章:

上一篇
tokio(二):任务系统-spawn、取消与协作式调度
下一篇
DDD(二):领域事件