Table of contents
Open Table of contents
TL;DR
tokio 的 I/O 驱动将 mio 的 Poll 包装为与调度器集成的 Reactor,通过 Waker 机制在 fd 就绪时唤醒对应 task。定时器使用六级分层时间轮(每级 64 slot),所有操作 O(1)。同步原语(Mutex / channel / Semaphore)专为 async 设计,核心区别是等待时不阻塞线程。select! 宏是 tokio 最强大也最危险的组合器,理解其取消语义是正确使用的前提。
I/O 集成
I/O 驱动的角色
tokio 的 I/O 驱动是 mio 与调度器之间的桥梁:
应用代码: socket.read(&mut buf).await
│
▼
tokio::net::TcpStream::poll_read()
│ 检查 fd 是否就绪
├─ 就绪 ──→ 执行 syscall read(),返回 Poll::Ready(data)
│
└─ 未就绪 ──→ 向 I/O driver 注册 interest (READABLE)
存储当前 task 的 Waker
返回 Poll::Pending
│
▼
I/O driver (mio::Poll) 在后台运行
epoll_wait 检测到 fd 可读
│
▼
调用 Waker::wake()
task 被重新放入调度队列
│
▼
调度器再次 poll 该 task
socket.read() 这次返回 Ready(data)
mio 在底层做什么
mio 将操作系统的 I/O 事件通知抽象为统一接口:
| 操作系统 | 底层机制 | mio 抽象 |
|---|---|---|
| Linux | epoll | mio::Poll → epoll_create + epoll_ctl + epoll_wait |
| macOS | kqueue | mio::Poll → kqueue + kevent |
| Windows | IOCP | mio::Poll → CreateIoCompletionPort + GetQueuedCompletionStatus |
tokio 不直接调用 epoll_wait——它通过 mio 间接交互。I/O driver 内部维护一个 mio::Poll 实例和一个 slab(基于索引的分配器),将每个注册的 fd 映射到对应的 Waker。
I/O driver 与调度器的集成
I/O driver 不在独立线程运行。在多线程 runtime 中,当一个 worker 线程 park(无任务可执行)时,它会尝试成为 I/O driver:
- Worker 线程尝试获取 I/O driver 的锁
- 获取成功 → 调用
mio::Poll::poll()等待 I/O 事件,同时也等待新任务通知 - 获取失败(另一个 worker 已在驱动 I/O)→ 使用 condvar 等待
- I/O 事件到达 → 唤醒对应 task 的 Waker → task 进入调度队列
此外,每 61 tick,活跃的 worker 会执行一次零超时的 poll()(非阻塞),检查是否有 I/O 事件,防止 I/O 被计算密集型任务饿死。
AsyncFd —— 将任意 fd 接入 tokio reactor
对于 tokio 不直接支持的 fd(如 Linux 的 eventfd、timerfd、inotify),可以通过 tokio::io::unix::AsyncFd 手动注册:
use tokio::io::unix::AsyncFd;
use std::os::unix::io::AsRawFd;
// 假设 my_fd 是一个实现了 AsRawFd 的类型
let async_fd = AsyncFd::new(my_fd)?;
// 等待可读
let mut guard = async_fd.readable().await?;
// 尝试读取
match guard.try_io(|inner| {
// inner.get_ref() 获取原始 fd 引用
// 执行非阻塞 read
let mut buf = [0u8; 1024];
let n = unsafe { libc::read(inner.get_ref().as_raw_fd(), buf.as_mut_ptr() as _, buf.len()) };
if n >= 0 { Ok(n as usize) } else { Err(std::io::Error::last_os_error()) }
}) {
Ok(Ok(n)) => println!("read {} bytes", n),
Ok(Err(e)) => eprintln!("io error: {}", e),
Err(_would_block) => { /* fd 还没就绪,guard 自动重新注册 */ }
}
核心网络类型
| tokio 类型 | std 对应 | 关键 async 方法 |
|---|---|---|
tokio::net::TcpListener | std::net::TcpListener | bind(), accept().await |
tokio::net::TcpStream | std::net::TcpStream | connect().await, read().await, write().await |
tokio::net::UdpSocket | std::net::UdpSocket | bind(), recv_from().await, send_to().await |
tokio::net::UnixListener | std::os::unix::net::UnixListener | bind(), accept().await |
tokio::net::UnixStream | std::os::unix::net::UnixStream | connect().await |
AsyncRead / AsyncWrite vs std Read / Write
// std::io(同步,阻塞线程)
pub trait Read {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}
// tokio::io(异步,不阻塞线程)
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
}
关键差异:
| 维度 | std::io::Read | tokio::io::AsyncRead |
|---|---|---|
| 阻塞行为 | 阻塞当前线程 | 返回 Poll::Pending,不阻塞 |
| 参数 | &mut [u8] | ReadBuf(可跟踪已初始化部分,避免 UB) |
| 自引用 | &mut self | Pin<&mut Self>(支持自引用 Future) |
| 使用方式 | 直接调用 | 通常通过 AsyncReadExt::read().await 使用 |
缓冲 I/O
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
// BufReader —— 减少 syscall 次数
let reader = BufReader::new(tcp_stream);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
println!("{}", line);
}
// BufWriter —— 批量写入
let mut writer = BufWriter::new(tcp_stream);
writer.write_all(b"hello\n").await?;
writer.flush().await?; // 必须 flush,否则数据留在缓冲区
定时器系统
核心 API
use tokio::time::{self, Duration, Instant};
// sleep —— 等待指定时间
time::sleep(Duration::from_secs(1)).await;
// sleep_until —— 等待到指定时刻
time::sleep_until(Instant::now() + Duration::from_secs(1)).await;
// interval —— 周期性触发
let mut interval = time::interval(Duration::from_millis(100));
loop {
interval.tick().await; // 第一次立即返回
do_work().await;
}
// timeout —— 给 Future 加超时
match time::timeout(Duration::from_secs(5), long_operation()).await {
Ok(result) => println!("completed: {:?}", result),
Err(_) => println!("timed out"),
}
interval 的 MissedTickBehavior
当 tick 处理时间超过间隔时间时,interval 提供三种策略:
use tokio::time::MissedTickBehavior;
let mut interval = time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(MissedTickBehavior::Burst); // 默认
| 策略 | 行为 | 适用场景 |
|---|---|---|
Burst(默认) | 尽快补发所有错过的 tick | 需要精确计数的场景 |
Delay | 从当前时间重新计算下次 tick | 最常用,避免突发 |
Skip | 跳过错过的 tick,对齐到下一个整数倍时刻 | cron-like 定时 |
分层时间轮内部实现
tokio 的定时器 不用最小堆(min-heap)——使用六级分层哈希时间轮(hierarchical hashed timing wheel),所有操作(创建、取消、触发)均为 O(1)。
结构
Level 0: 64 slots × 1ms = 覆盖 64ms
Level 1: 64 slots × 64ms = 覆盖 ~4 秒
Level 2: 64 slots × 4s = 覆盖 ~4 分钟
Level 3: 64 slots × 4min = 覆盖 ~4 小时
Level 4: 64 slots × 4hr = 覆盖 ~11 天
Level 5: 64 slots × 11day = 覆盖 ~2 年
共 6 × 64 = 384 个 slot,覆盖约 2 年的时间范围。
工作流程
-
注册定时器:根据超时时间距当前时刻的距离,放入对应级别的 slot
- 64ms 内 → Level 0
- 64ms ~ 4s → Level 1
- 4s ~ 4min → Level 2
- …
-
时间推进:当 Level 0 的某个 slot 到期,触发该 slot 中的所有定时器
-
级联(cascade):当 Level 0 完整转完一圈(64ms),从 Level 1 的当前 slot 中取出所有定时器,重新分配到 Level 0 的 64 个 slot 中
为什么不用最小堆
| 操作 | 最小堆 (std BinaryHeap) | 时间轮 |
|---|---|---|
| 插入 | O(log n) | O(1) |
| 取消 | O(log n) 或 O(n) | O(1) |
| 触发最近 | O(log n) | O(1)(摊还) |
| 内存布局 | 连续数组 | 固定 384 slot |
当定时器数量很大时(网络服务器中常见数万个连接各有超时),O(1) 的优势非常明显。
Timer 精度与限制
- 最小分辨率:1ms(Level 0 的 slot 粒度)
- 实际精度:受调度延迟和操作系统影响,通常在 1-10ms 范围
- 不适合:亚毫秒级精度需求(如音频处理、实时控制)
测试中的时间控制
#[tokio::test]
async fn test_timeout() {
// 暂停时间 —— sleep 不会真正等待,而是虚拟推进
tokio::time::pause();
let start = tokio::time::Instant::now();
tokio::time::sleep(Duration::from_secs(3600)).await; // 不会真等 1 小时
assert!(start.elapsed() >= Duration::from_secs(3600));
// 实际测试执行时间: ~0ms
}
tokio::time::pause() 冻结真实时间,使 sleep / interval / timeout 在 .await 时立即推进虚拟时钟。这对测试超时逻辑极其有用。注意:#[tokio::test] 使用 current_thread runtime,默认自动 pause 时间。
同步原语
为什么 tokio 要自己实现 Mutex
std::sync::Mutex::lock() 是同步操作——如果锁被占用,阻塞当前线程。在异步代码中:
// 危险:std Mutex 阻塞 worker 线程
let data = std::sync::Mutex::new(vec![]);
tokio::spawn(async move {
let mut guard = data.lock().unwrap(); // 阻塞 worker 线程!
some_io().await; // 持有锁跨 .await —— 其他 worker 可能也需要这个锁
guard.push(1);
});
tokio 的 Mutex::lock() 返回一个 Future——如果锁被占用,任务让出执行权而不阻塞线程:
let data = tokio::sync::Mutex::new(vec![]);
let data = Arc::new(data);
tokio::spawn({
let data = data.clone();
async move {
let mut guard = data.lock().await; // 不阻塞线程,让出调度
some_io().await; // 可以安全地持有锁跨 .await
guard.push(1);
}
});
何时用 std Mutex vs tokio Mutex
| 场景 | 推荐 | 原因 |
|---|---|---|
临界区内没有 .await,锁持有时间极短 | std::sync::Mutex | 性能更好(无 Future 开销) |
临界区内有 .await | tokio::sync::Mutex | 必须,否则阻塞 worker 线程 |
| 保护纯数据(HashMap 等),只做快速读写 | std::sync::Mutex 或 parking_lot::Mutex | 微秒级临界区不值得 async 开销 |
| 保护 I/O 资源(数据库连接等) | tokio::sync::Mutex | I/O 操作耗时不确定 |
关于公平性:tokio Mutex 保证 FIFO 公平——等待获取锁的任务按调用 lock() 的顺序获得锁。std::sync::Mutex 不保证任何公平性。
全部同步原语一览
| 原语 | 模块路径 | 用途 | 对比 Asio |
|---|---|---|---|
Mutex | tokio::sync::Mutex | 异步互斥锁,可跨 .await 持有 | Asio 无直接对应,strand 解决并发序列化 |
RwLock | tokio::sync::RwLock | 异步读写锁,多读单写 | 无对应 |
Semaphore | tokio::sync::Semaphore | 异步信号量,控制并发数 | 无对应 |
Notify | tokio::sync::Notify | 异步通知,类似条件变量 | 类似 io_context::post 的通知语义 |
mpsc | tokio::sync::mpsc | 多生产者单消费者有界/无界 channel | 无直接对应 |
broadcast | tokio::sync::broadcast | 多生产者多消费者广播 channel | 无对应 |
oneshot | tokio::sync::oneshot | 单次值传递 channel | 类似 std::promise/future |
watch | tokio::sync::watch | 单生产者多消费者,只保留最新值 | 无对应 |
Channel 类型选择指南
// mpsc —— 最常用,任务间通信
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32); // 有界,容量 32
tokio::spawn(async move {
tx.send("hello".into()).await.unwrap(); // 满时 .await 阻塞(不阻塞线程)
});
let msg = rx.recv().await; // None 表示所有 sender 已 drop
// oneshot —— 单次请求-响应
let (tx, rx) = tokio::sync::oneshot::channel::<i32>();
tx.send(42).unwrap(); // send 是同步的,不需要 .await
let value = rx.await.unwrap();
// broadcast —— 所有 receiver 都收到每条消息
let (tx, _) = tokio::sync::broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("event".into()).unwrap();
// rx1 和 rx2 都会收到 "event"
// watch —— 配置/状态变更通知,只关心最新值
let (tx, mut rx) = tokio::sync::watch::channel("initial");
tx.send("updated").unwrap();
rx.changed().await.unwrap();
let current = rx.borrow().clone(); // "updated"
| Channel | 生产者 | 消费者 | 容量 | 典型用途 |
|---|---|---|---|---|
mpsc | 多个 | 1 个 | 有界/无界 | 任务间消息传递,工作队列 |
oneshot | 1 个 | 1 个 | 1 | 请求-响应,通知完成 |
broadcast | 多个 | 多个 | 有界 | 事件广播,发布-订阅 |
watch | 1 个 | 多个 | 1(最新值) | 配置热更新,状态共享 |
select! 与 join!
tokio::select! —— 竞争多个 Future
use tokio::sync::mpsc;
use tokio::time::{self, Duration};
let (tx, mut rx) = mpsc::channel::<String>(32);
tokio::select! {
Some(msg) = rx.recv() => {
println!("received: {}", msg);
}
_ = time::sleep(Duration::from_secs(5)) => {
println!("timeout");
}
}
执行语义
- 所有分支的 Future 被同时创建
- 在同一个 task 内交替 poll(不是并行,是并发)
- 第一个返回 Ready 的分支获胜
- 其他分支的 Future 被 drop(取消)
- 执行获胜分支的 handler 代码
公平性
默认情况下,select! 随机选择 先 poll 哪个分支。这防止了某个分支因为总是被先 poll 而获得不公平的优势。
可以用 biased; 强制按书写顺序 poll:
tokio::select! {
biased; // 按顺序 poll:先 rx1,再 rx2
msg = rx1.recv() => { /* ... */ }
msg = rx2.recv() => { /* ... */ }
}
biased 适用场景:优先级队列——高优先级的分支写在前面。
取消语义与 Pin
select! 的核心危险:未完成的分支会被 drop。
// 危险示例:每次循环都创建新的 sleep Future
loop {
tokio::select! {
msg = rx.recv() => { process(msg).await; }
_ = tokio::time::sleep(Duration::from_secs(30)) => { // 每次循环重新创建!
println!("idle timeout");
break;
}
}
}
问题:每次 rx.recv() 返回时,sleep Future 被 drop。下次循环创建新的 sleep,重新倒计时 30 秒。真正的 30 秒空闲超时永远不会触发。
修复:用 tokio::pin! 固定 Future,跨迭代复用:
let sleep = tokio::time::sleep(Duration::from_secs(30));
tokio::pin!(sleep); // Pin 到栈上,阻止移动
loop {
tokio::select! {
msg = rx.recv() => { process(msg).await; }
_ = &mut sleep => { // 借用而非移动
println!("idle timeout");
break;
}
}
}
Pin 的作用:保证 Future 不会在内存中被移动,使其内部的自引用指针(编译器生成的状态机可能包含自引用)保持有效。在 select! 循环中,Pin 使得 Future 的状态在迭代间持久化。
Rust 1.68+ 稳定了
std::pin::pin!,比tokio::pin!更符合惯用写法,功能相同。
tokio::join! —— 并发等待所有 Future 完成
let (user, posts, comments) = tokio::join!(
fetch_user(id),
fetch_posts(id),
fetch_comments(id),
);
// 三个请求并发执行,全部完成后返回
join! 与 select! 的区别:
| 维度 | join! | select! |
|---|---|---|
| 完成条件 | 所有分支完成 | 任一分支完成 |
| 取消 | 无取消 | 未完成分支被 drop |
| 返回值 | 所有分支结果的元组 | 获胜分支的结果 |
| Pin 需求 | 不需要 | 循环中复用 Future 时需要 |
| 错误处理 | 即使某个分支 panic,其他分支仍继续 | 获胜后其他分支立即 drop |
tokio::try_join! 是 join! 的变体——任一分支返回 Err 时立即返回(但不取消其他分支的 Future,只是不再 poll 它们——它们会在 try_join! 返回时被 drop)。