跳转到正文
zeno's blog
返回

tokio(三):I/O、定时器、同步原语与 select!

专题: tokio

Table of contents

Open Table of contents

TL;DR

tokio 的 I/O 驱动将 mio 的 Poll 包装为与调度器集成的 Reactor,通过 Waker 机制在 fd 就绪时唤醒对应 task。定时器使用六级分层时间轮(每级 64 slot),所有操作 O(1)。同步原语(Mutex / channel / Semaphore)专为 async 设计,核心区别是等待时不阻塞线程。select! 宏是 tokio 最强大也最危险的组合器,理解其取消语义是正确使用的前提。


I/O 集成

I/O 驱动的角色

tokio 的 I/O 驱动是 mio 与调度器之间的桥梁:

应用代码: socket.read(&mut buf).await


tokio::net::TcpStream::poll_read()
    │ 检查 fd 是否就绪
    ├─ 就绪 ──→ 执行 syscall read(),返回 Poll::Ready(data)

    └─ 未就绪 ──→ 向 I/O driver 注册 interest (READABLE)
                  存储当前 task 的 Waker
                  返回 Poll::Pending


                  I/O driver (mio::Poll) 在后台运行
                  epoll_wait 检测到 fd 可读


                  调用 Waker::wake()
                  task 被重新放入调度队列


                  调度器再次 poll 该 task
                  socket.read() 这次返回 Ready(data)

mio 在底层做什么

mio 将操作系统的 I/O 事件通知抽象为统一接口:

操作系统底层机制mio 抽象
Linuxepollmio::Pollepoll_create + epoll_ctl + epoll_wait
macOSkqueuemio::Pollkqueue + kevent
WindowsIOCPmio::PollCreateIoCompletionPort + GetQueuedCompletionStatus

tokio 不直接调用 epoll_wait——它通过 mio 间接交互。I/O driver 内部维护一个 mio::Poll 实例和一个 slab(基于索引的分配器),将每个注册的 fd 映射到对应的 Waker

I/O driver 与调度器的集成

I/O driver 不在独立线程运行。在多线程 runtime 中,当一个 worker 线程 park(无任务可执行)时,它会尝试成为 I/O driver:

  1. Worker 线程尝试获取 I/O driver 的锁
  2. 获取成功 → 调用 mio::Poll::poll() 等待 I/O 事件,同时也等待新任务通知
  3. 获取失败(另一个 worker 已在驱动 I/O)→ 使用 condvar 等待
  4. I/O 事件到达 → 唤醒对应 task 的 Waker → task 进入调度队列

此外,每 61 tick,活跃的 worker 会执行一次零超时的 poll()(非阻塞),检查是否有 I/O 事件,防止 I/O 被计算密集型任务饿死。

AsyncFd —— 将任意 fd 接入 tokio reactor

对于 tokio 不直接支持的 fd(如 Linux 的 eventfd、timerfd、inotify),可以通过 tokio::io::unix::AsyncFd 手动注册:

use tokio::io::unix::AsyncFd;
use std::os::unix::io::AsRawFd;

// 假设 my_fd 是一个实现了 AsRawFd 的类型
let async_fd = AsyncFd::new(my_fd)?;

// 等待可读
let mut guard = async_fd.readable().await?;

// 尝试读取
match guard.try_io(|inner| {
    // inner.get_ref() 获取原始 fd 引用
    // 执行非阻塞 read
    let mut buf = [0u8; 1024];
    let n = unsafe { libc::read(inner.get_ref().as_raw_fd(), buf.as_mut_ptr() as _, buf.len()) };
    if n >= 0 { Ok(n as usize) } else { Err(std::io::Error::last_os_error()) }
}) {
    Ok(Ok(n)) => println!("read {} bytes", n),
    Ok(Err(e)) => eprintln!("io error: {}", e),
    Err(_would_block) => { /* fd 还没就绪,guard 自动重新注册 */ }
}

核心网络类型

tokio 类型std 对应关键 async 方法
tokio::net::TcpListenerstd::net::TcpListenerbind(), accept().await
tokio::net::TcpStreamstd::net::TcpStreamconnect().await, read().await, write().await
tokio::net::UdpSocketstd::net::UdpSocketbind(), recv_from().await, send_to().await
tokio::net::UnixListenerstd::os::unix::net::UnixListenerbind(), accept().await
tokio::net::UnixStreamstd::os::unix::net::UnixStreamconnect().await

AsyncRead / AsyncWrite vs std Read / Write

// std::io(同步,阻塞线程)
pub trait Read {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

// tokio::io(异步,不阻塞线程)
pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>>;
}

关键差异:

维度std::io::Readtokio::io::AsyncRead
阻塞行为阻塞当前线程返回 Poll::Pending,不阻塞
参数&mut [u8]ReadBuf(可跟踪已初始化部分,避免 UB)
自引用&mut selfPin<&mut Self>(支持自引用 Future)
使用方式直接调用通常通过 AsyncReadExt::read().await 使用

缓冲 I/O

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};

// BufReader —— 减少 syscall 次数
let reader = BufReader::new(tcp_stream);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
    println!("{}", line);
}

// BufWriter —— 批量写入
let mut writer = BufWriter::new(tcp_stream);
writer.write_all(b"hello\n").await?;
writer.flush().await?;  // 必须 flush,否则数据留在缓冲区

定时器系统

核心 API

use tokio::time::{self, Duration, Instant};

// sleep —— 等待指定时间
time::sleep(Duration::from_secs(1)).await;

// sleep_until —— 等待到指定时刻
time::sleep_until(Instant::now() + Duration::from_secs(1)).await;

// interval —— 周期性触发
let mut interval = time::interval(Duration::from_millis(100));
loop {
    interval.tick().await;  // 第一次立即返回
    do_work().await;
}

// timeout —— 给 Future 加超时
match time::timeout(Duration::from_secs(5), long_operation()).await {
    Ok(result) => println!("completed: {:?}", result),
    Err(_) => println!("timed out"),
}

interval 的 MissedTickBehavior

当 tick 处理时间超过间隔时间时,interval 提供三种策略:

use tokio::time::MissedTickBehavior;

let mut interval = time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(MissedTickBehavior::Burst);  // 默认
策略行为适用场景
Burst(默认)尽快补发所有错过的 tick需要精确计数的场景
Delay从当前时间重新计算下次 tick最常用,避免突发
Skip跳过错过的 tick,对齐到下一个整数倍时刻cron-like 定时

分层时间轮内部实现

tokio 的定时器 不用最小堆(min-heap)——使用六级分层哈希时间轮(hierarchical hashed timing wheel),所有操作(创建、取消、触发)均为 O(1)。

结构

Level 0:  64 slots × 1ms    = 覆盖 64ms
Level 1:  64 slots × 64ms   = 覆盖 ~4 秒
Level 2:  64 slots × 4s     = 覆盖 ~4 分钟
Level 3:  64 slots × 4min   = 覆盖 ~4 小时
Level 4:  64 slots × 4hr    = 覆盖 ~11 天
Level 5:  64 slots × 11day  = 覆盖 ~2 年

共 6 × 64 = 384 个 slot,覆盖约 2 年的时间范围。

工作流程

  1. 注册定时器:根据超时时间距当前时刻的距离,放入对应级别的 slot

    • 64ms 内 → Level 0
    • 64ms ~ 4s → Level 1
    • 4s ~ 4min → Level 2
  2. 时间推进:当 Level 0 的某个 slot 到期,触发该 slot 中的所有定时器

  3. 级联(cascade):当 Level 0 完整转完一圈(64ms),从 Level 1 的当前 slot 中取出所有定时器,重新分配到 Level 0 的 64 个 slot 中

为什么不用最小堆

操作最小堆 (std BinaryHeap)时间轮
插入O(log n)O(1)
取消O(log n) 或 O(n)O(1)
触发最近O(log n)O(1)(摊还)
内存布局连续数组固定 384 slot

当定时器数量很大时(网络服务器中常见数万个连接各有超时),O(1) 的优势非常明显。

Timer 精度与限制

测试中的时间控制

#[tokio::test]
async fn test_timeout() {
    // 暂停时间 —— sleep 不会真正等待,而是虚拟推进
    tokio::time::pause();

    let start = tokio::time::Instant::now();
    tokio::time::sleep(Duration::from_secs(3600)).await;  // 不会真等 1 小时
    assert!(start.elapsed() >= Duration::from_secs(3600));
    // 实际测试执行时间: ~0ms
}

tokio::time::pause() 冻结真实时间,使 sleep / interval / timeout.await 时立即推进虚拟时钟。这对测试超时逻辑极其有用。注意:#[tokio::test] 使用 current_thread runtime,默认自动 pause 时间。


同步原语

为什么 tokio 要自己实现 Mutex

std::sync::Mutex::lock() 是同步操作——如果锁被占用,阻塞当前线程。在异步代码中:

// 危险:std Mutex 阻塞 worker 线程
let data = std::sync::Mutex::new(vec![]);
tokio::spawn(async move {
    let mut guard = data.lock().unwrap();  // 阻塞 worker 线程!
    some_io().await;  // 持有锁跨 .await —— 其他 worker 可能也需要这个锁
    guard.push(1);
});

tokio 的 Mutex::lock() 返回一个 Future——如果锁被占用,任务让出执行权而不阻塞线程:

let data = tokio::sync::Mutex::new(vec![]);
let data = Arc::new(data);

tokio::spawn({
    let data = data.clone();
    async move {
        let mut guard = data.lock().await;  // 不阻塞线程,让出调度
        some_io().await;  // 可以安全地持有锁跨 .await
        guard.push(1);
    }
});

何时用 std Mutex vs tokio Mutex

场景推荐原因
临界区内没有 .await,锁持有时间极短std::sync::Mutex性能更好(无 Future 开销)
临界区内有 .awaittokio::sync::Mutex必须,否则阻塞 worker 线程
保护纯数据(HashMap 等),只做快速读写std::sync::Mutexparking_lot::Mutex微秒级临界区不值得 async 开销
保护 I/O 资源(数据库连接等)tokio::sync::MutexI/O 操作耗时不确定

关于公平性:tokio Mutex 保证 FIFO 公平——等待获取锁的任务按调用 lock() 的顺序获得锁。std::sync::Mutex 不保证任何公平性。

全部同步原语一览

原语模块路径用途对比 Asio
Mutextokio::sync::Mutex异步互斥锁,可跨 .await 持有Asio 无直接对应,strand 解决并发序列化
RwLocktokio::sync::RwLock异步读写锁,多读单写无对应
Semaphoretokio::sync::Semaphore异步信号量,控制并发数无对应
Notifytokio::sync::Notify异步通知,类似条件变量类似 io_context::post 的通知语义
mpsctokio::sync::mpsc多生产者单消费者有界/无界 channel无直接对应
broadcasttokio::sync::broadcast多生产者多消费者广播 channel无对应
oneshottokio::sync::oneshot单次值传递 channel类似 std::promise/future
watchtokio::sync::watch单生产者多消费者,只保留最新值无对应

Channel 类型选择指南

// mpsc —— 最常用,任务间通信
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32);  // 有界,容量 32
tokio::spawn(async move {
    tx.send("hello".into()).await.unwrap();  // 满时 .await 阻塞(不阻塞线程)
});
let msg = rx.recv().await;  // None 表示所有 sender 已 drop

// oneshot —— 单次请求-响应
let (tx, rx) = tokio::sync::oneshot::channel::<i32>();
tx.send(42).unwrap();  // send 是同步的,不需要 .await
let value = rx.await.unwrap();

// broadcast —— 所有 receiver 都收到每条消息
let (tx, _) = tokio::sync::broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("event".into()).unwrap();
// rx1 和 rx2 都会收到 "event"

// watch —— 配置/状态变更通知,只关心最新值
let (tx, mut rx) = tokio::sync::watch::channel("initial");
tx.send("updated").unwrap();
rx.changed().await.unwrap();
let current = rx.borrow().clone();  // "updated"
Channel生产者消费者容量典型用途
mpsc多个1 个有界/无界任务间消息传递,工作队列
oneshot1 个1 个1请求-响应,通知完成
broadcast多个多个有界事件广播,发布-订阅
watch1 个多个1(最新值)配置热更新,状态共享

select! 与 join!

tokio::select! —— 竞争多个 Future

use tokio::sync::mpsc;
use tokio::time::{self, Duration};

let (tx, mut rx) = mpsc::channel::<String>(32);

tokio::select! {
    Some(msg) = rx.recv() => {
        println!("received: {}", msg);
    }
    _ = time::sleep(Duration::from_secs(5)) => {
        println!("timeout");
    }
}

执行语义

  1. 所有分支的 Future 被同时创建
  2. 在同一个 task 内交替 poll(不是并行,是并发)
  3. 第一个返回 Ready 的分支获胜
  4. 其他分支的 Future 被 drop(取消)
  5. 执行获胜分支的 handler 代码

公平性

默认情况下,select! 随机选择 先 poll 哪个分支。这防止了某个分支因为总是被先 poll 而获得不公平的优势。

可以用 biased; 强制按书写顺序 poll:

tokio::select! {
    biased;  // 按顺序 poll:先 rx1,再 rx2
    msg = rx1.recv() => { /* ... */ }
    msg = rx2.recv() => { /* ... */ }
}

biased 适用场景:优先级队列——高优先级的分支写在前面。

取消语义与 Pin

select! 的核心危险:未完成的分支会被 drop

// 危险示例:每次循环都创建新的 sleep Future
loop {
    tokio::select! {
        msg = rx.recv() => { process(msg).await; }
        _ = tokio::time::sleep(Duration::from_secs(30)) => {  // 每次循环重新创建!
            println!("idle timeout");
            break;
        }
    }
}

问题:每次 rx.recv() 返回时,sleep Future 被 drop。下次循环创建新的 sleep,重新倒计时 30 秒。真正的 30 秒空闲超时永远不会触发。

修复:用 tokio::pin! 固定 Future,跨迭代复用:

let sleep = tokio::time::sleep(Duration::from_secs(30));
tokio::pin!(sleep);  // Pin 到栈上,阻止移动

loop {
    tokio::select! {
        msg = rx.recv() => { process(msg).await; }
        _ = &mut sleep => {  // 借用而非移动
            println!("idle timeout");
            break;
        }
    }
}

Pin 的作用:保证 Future 不会在内存中被移动,使其内部的自引用指针(编译器生成的状态机可能包含自引用)保持有效。在 select! 循环中,Pin 使得 Future 的状态在迭代间持久化。

Rust 1.68+ 稳定了 std::pin::pin!,比 tokio::pin! 更符合惯用写法,功能相同。

tokio::join! —— 并发等待所有 Future 完成

let (user, posts, comments) = tokio::join!(
    fetch_user(id),
    fetch_posts(id),
    fetch_comments(id),
);
// 三个请求并发执行,全部完成后返回

join!select! 的区别:

维度join!select!
完成条件所有分支完成任一分支完成
取消无取消未完成分支被 drop
返回值所有分支结果的元组获胜分支的结果
Pin 需求不需要循环中复用 Future 时需要
错误处理即使某个分支 panic,其他分支仍继续获胜后其他分支立即 drop

tokio::try_join!join! 的变体——任一分支返回 Err 时立即返回(但不取消其他分支的 Future,只是不再 poll 它们——它们会在 try_join! 返回时被 drop)。


分享这篇文章:

上一篇
微服务(一):拆分-按运维特征和组织边界画线
下一篇
tokio(二):任务系统-spawn、取消与协作式调度