Table of contents
Open Table of contents
TL;DR
mio 的核心由 6 个类型组成:Poll(事件循环)、Registry(注册表)、Token(事件标识)、Interest(监听意图)、Events(事件集合)、Source trait(可轮询的 I/O 源)。整个 API 没有一个回调函数、没有一个 closure——全靠整数 Token 做事件分发。这是有意为之的设计决策,为了与 Rust 的 ownership 模型兼容。
架构全景
┌────────────────────┐
│ Event Loop │
│ (用户代码) │
└─────────┬──────────┘
│ poll.poll(&mut events, timeout)
▼
┌────────────────┐
│ Poll │ ← 包装 OS selector
│ ┌──────────┐ │
│ │ Registry │ │ ← 管理 I/O 源注册
│ └──────────┘ │
└────────┬───────┘
│ epoll_wait / kevent / GetQueuedCompletionStatusEx
▼
┌────────────────┐
│ OS Kernel │
└────────────────┘
用户的事件循环是一个 loop:调 poll.poll() → 遍历 events → 根据 event.token() 分发 → 执行 I/O → 回到 poll.poll()。
Poll:事件循环的核心引擎
// mio::Poll
pub struct Poll {
registry: Registry,
}
Poll 是 mio 中最重要的类型。它封装了操作系统的 I/O 多路复用机制(Linux epoll, macOS kqueue, Windows IOCP)。
创建
let mut poll = Poll::new()?;
Poll::new() 在底层调用:
- Linux:
epoll_create1(EPOLL_CLOEXEC)— 创建 epoll 实例,设置 close-on-exec 防止文件描述符泄漏到子进程 - macOS:
kqueue()+ 手动设置 close-on-exec - Windows: 创建 IOCP CompletionPort + 初始化 AFD 组
轮询
pub fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>
这是唯一的阻塞点。语义:
- 清空
events集合 - 阻塞等待已注册的 I/O 源产生就绪事件,或者超时
- 将就绪事件填充到
events中 - 返回
关键行为:
&mut self:只有一个线程能同时调用poll()。这不是限制,是设计——避免多线程竞争事件。timeout: Option<Duration>:None表示无限等待;Some(Duration::ZERO)表示立即返回(非阻塞轮询)。- events 被复用:
Events在每次poll()调用前自动清空,用户应该在循环外创建一次,反复传入。
获取 Registry
pub fn registry(&self) -> &Registry
Poll 拥有唯一的 Registry。通过 registry() 获取引用来注册/注销 I/O 源。Registry 可以通过 try_clone() 复制,复制出来的 Registry 共享同一个底层 selector——这是为了让注册操作可以在其他线程进行。
线程安全
Poll 实现了 Send + Sync。但 poll() 要求 &mut self,所以实际上同一时刻只有一个线程能轮询。Registry 也是 Send + Sync,可以跨线程注册 I/O 源。
生命周期
当 Poll 被 drop 时,它可能取消已注册事件源的进行中操作。文档原文:
When the
Pollinstance is dropped it may cancel in-flight operations for the registered event sources.
这意味着你必须保证 Poll 的生命周期覆盖所有注册的事件源。
Registry:注册表
// mio::Registry(内部定义)
pub struct Registry {
selector: sys::Selector,
#[cfg(debug_assertions)]
has_waker: Arc<AtomicBool>, // debug 模式下检测 Waker 唯一性
}
Registry 负责管理 I/O 源与 Poll 之间的关联关系。三个核心方法:
register()
pub fn register<S: Source + ?Sized>(
&self,
source: &mut S,
token: Token,
interests: Interest,
) -> io::Result<()>
将一个 I/O 源注册到 Poll,指定 Token 和 Interest。
- 一个源只能注册到一个 Poll:重复注册到同一个 Poll 而不先 deregister 是 未定义行为
- Token 必须唯一:两个源用相同 Token 注册,你将无法区分事件来源
- 底层调用:Linux
epoll_ctl(EPOLL_CTL_ADD),macOSkevent(EV_ADD)
reregister()
pub fn reregister<S: Source + ?Sized>(
&self,
source: &mut S,
token: Token,
interests: Interest,
) -> io::Result<()>
修改已注册源的 Token 和 Interest。注意:参数完全覆盖之前的值。如果之前注册了 READABLE | WRITABLE,reregister 只传 READABLE,则 writable 事件不再被监听。
底层调用:Linux epoll_ctl(EPOLL_CTL_MOD),macOS kevent(EV_ADD) 重新添加。
deregister()
pub fn deregister<S: Source + ?Sized>(
&self,
source: &mut S,
) -> io::Result<()>
取消注册。调用后保证不会再收到该源的事件。源可以在 deregister 后重新 register 到同一个 Poll。
底层调用:Linux epoll_ctl(EPOLL_CTL_DEL)。
try_clone()
pub fn try_clone(&self) -> io::Result<Registry>
复制 Registry,复制出来的和原始的共享同一个底层 selector。用于跨线程注册场景:一个线程 poll,另一个线程通过 cloned Registry 注册新的 I/O 源。
Token:整数标识的事件分发
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Token(pub usize);
Token 就是一个 usize 的 newtype wrapper。没有生成器、没有分配器——你自己决定编号方案。
为什么是整数而不是回调
这是 mio 和 Asio 最核心的设计差异之一。
C++ Asio 的方式:每个异步操作绑定一个 completion handler(回调函数/函数对象)。
socket.async_read_some(buffer, [](error_code ec, size_t n) {
// 回调:操作完成时调用
});
mio 的方式:注册时给一个整数 Token,事件到达时返回这个 Token,由用户代码做 match 分发。
registry.register(&mut socket, Token(42), Interest::READABLE)?;
// 事件循环中
for event in events.iter() {
match event.token() {
Token(42) => { /* 处理 socket */ }
_ => {}
}
}
为什么 mio 不用回调?三个原因:
-
Rust 的 closure 有生命周期:如果把 closure 存起来等事件到达时调用,这个 closure 捕获的所有引用都必须活得足够久。这在 Rust 中极其难处理——你要么用
'static约束(强制 move 或 Arc),要么用 unsafe。Token 是Copy的,没有生命周期问题。 -
内存安全不需要堆分配:C++ Asio 的 handler 通常需要堆分配(allocate handler memory)。mio 的 Token 是
usize,0 字节额外开销。 -
与 Rust 的 match 语法契合:Rust 的
match表达式天然适合基于整数的分发。编译器还能检查你是否遗漏了分支。
Token 的编号策略
mio 不替你管 Token 分配。常见做法:
// 方法 1:手动计数器
const SERVER: Token = Token(0);
let mut next_token = 1;
fn alloc_token(counter: &mut usize) -> Token {
let t = Token(*counter);
*counter += 1;
t
}
// 方法 2:用 slab crate(mio 文档推荐)
// slab 是一个基于数组的 O(1) 分配器,正好适合 Token 场景
let mut slab = slab::Slab::new();
let entry = slab.vacant_entry();
let token = Token(entry.key());
entry.insert(my_connection);
mio 文档明确推荐用 slab 而不是 HashMap——因为 slab 基于数组,Token 就是数组索引,O(1) 查找。
Interest:监听意图
pub struct Interest(NonZeroU8);
Interest 表示你对一个 I/O 源关心的事件类型。它用一个 NonZeroU8 的位标志实现。
核心常量
| 常量 | 位值 | 含义 | 平台可用性 |
|---|---|---|---|
Interest::READABLE | 0b0_0001 | 可读 | 全平台 |
Interest::WRITABLE | 0b0_0010 | 可写 | 全平台 |
Interest::AIO | 0b0_0100 | 异步 I/O 完成 | DragonFly BSD, FreeBSD, macOS |
Interest::LIO | 0b0_1000 | 列表 I/O 完成 | FreeBSD |
Interest::PRIORITY | 0b1_0000 | 优先级事件 | Linux, Android |
跨平台保证:只有 READABLE 和 WRITABLE 在所有平台上保证可用。AIO、LIO、PRIORITY 是平台特定的。
组合操作
// const 上下文可用
const INTERESTS: Interest = Interest::READABLE.add(Interest::WRITABLE);
// 运行时也可以用 BitOr
let interests = Interest::READABLE | Interest::WRITABLE;
// 移除某个 interest
let read_only = interests.remove(Interest::WRITABLE); // Some(Interest::READABLE)
let nothing = Interest::READABLE.remove(Interest::READABLE); // None
add() 是 const fn,可以在编译期组合。remove() 返回 Option<Interest> 因为如果移除后为空,NonZeroU8 不允许为 0。
Events:事件集合
pub struct Events {
inner: sys::Events,
}
Events 是 poll() 的输出容器,内部是平台特定的事件数组。
创建
let mut events = Events::with_capacity(1024);
with_capacity 指定最大事件数。这个值影响每次 poll() 返回的最大事件量——OS 可能有更多就绪事件,但 mio 只返回 capacity 个。实践中 128-1024 是常见值。
使用模式
// Events 应该在循环外创建,反复复用
let mut events = Events::with_capacity(128);
loop {
poll.poll(&mut events, None)?;
// poll() 内部会先 clear events,再填充
for event in events.iter() {
let token = event.token();
// 分发...
}
}
方法
| 方法 | 签名 | 说明 |
|---|---|---|
with_capacity | fn with_capacity(capacity: usize) -> Events | 创建指定容量的事件集合 |
capacity | fn capacity(&self) -> usize | 返回容量 |
is_empty | fn is_empty(&self) -> bool | 是否无事件 |
iter | fn iter(&self) -> Iter<'_> | 事件迭代器 |
clear | fn clear(&mut self) | 清空(poll() 内部自动调用) |
Events 实现了 IntoIterator(引用),可以直接 for event in &events。
Event:单个事件
#[repr(transparent)]
pub struct Event {
inner: sys::Event, // 平台特定的事件类型
}
Event 是 poll() 返回的单个就绪事件,包含一个 Token 和就绪状态标志。
方法
| 方法 | 返回 | 含义 | 跨平台保证 |
|---|---|---|---|
token() | Token | 事件对应的 Token | 全平台 |
is_readable() | bool | 可读就绪 | 全平台 |
is_writable() | bool | 可写就绪 | 全平台 |
is_error() | bool | 错误状态 | 提示性,可能漏报 |
is_read_closed() | bool | 读端关闭(对端 FIN / shutdown) | 尽力而为,不会误报 |
is_write_closed() | bool | 写端关闭 | 尽力而为,不会误报 |
is_priority() | bool | 优先级事件 | 仅 epoll (EPOLLPRI) |
is_aio() | bool | AIO 完成 | 仅 kqueue BSD/macOS |
is_lio() | bool | LIO 完成 | 仅 FreeBSD |
关于 is_read_closed / is_write_closed 的「尽力而为」
文档原文:
This is a best-effort implementation — false positives are guaranteed not to occur.
意思是:如果 is_read_closed() 返回 true,读端一定关闭了。但如果返回 false,读端可能已经关闭但 mio 没检测到。所以你不能仅依赖 is_read_closed() 来判断连接是否断开——还要检查 read() 返回 0。
OOB 数据与 is_readable
文档警告:Out-of-band (OOB) 数据也会触发 is_readable() == true。如果你的应用不处理 OOB 数据,一个恶意客户端可以持续发送 OOB 数据造成事件风暴。但因为 mio 使用边缘触发(edge-triggered),不会导致无限循环——只会触发一次。
平台特定的 is_error 实现
| 平台 | 检测标志 |
|---|---|
| epoll (Linux) | EPOLLERR |
| kqueue (macOS/BSD) | EV_ERROR 或 (EV_EOF + fflags != 0) |
Source trait:可轮询的 I/O 源
pub trait Source {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()>;
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()>;
fn deregister(&mut self, registry: &Registry) -> io::Result<()>;
}
任何实现了 Source 的类型都可以注册到 Poll。
内置实现者
mio 自己提供的 Source 实现:
| 类型 | 模块路径 | 功能 |
|---|---|---|
TcpListener | mio::net::TcpListener | TCP 监听器 |
TcpStream | mio::net::TcpStream | TCP 连接 |
UdpSocket | mio::net::UdpSocket | UDP 套接字 |
UnixListener | mio::net::UnixListener | Unix 域套接字监听器 |
UnixStream | mio::net::UnixStream | Unix 域套接字连接 |
UnixDatagram | mio::net::UnixDatagram | Unix 域数据报 |
SourceFd | mio::unix::SourceFd | 任意文件描述符包装器 |
Sender | mio::unix::pipe::Sender | Unix 管道发送端 |
Receiver | mio::unix::pipe::Receiver | Unix 管道接收端 |
自定义 Source
如果你有一个自己的类型底层是 fd/socket,可以这样实现 Source:
use mio::{event::Source, Interest, Registry, Token};
use mio::unix::SourceFd;
use std::os::unix::io::AsRawFd;
struct MyDevice {
fd: RawFd,
}
impl Source for MyDevice {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
SourceFd(&self.fd).register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
SourceFd(&self.fd).reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.fd).deregister(registry)
}
}
关键:SourceFd 不拥有 fd 的所有权——它接受 &RawFd,不会在 drop 时关闭 fd。
资源泄漏警告
Source 文档明确指出:
All event::Sources, unless otherwise specified, need to be deregistered before being dropped for them to not leak resources.
这和 Rust 通常的 RAII 模式不同。为什么不能在 drop() 中自动 deregister?因为 deregister() 需要 &Registry,而 Source 不持有 Registry 的引用。这是刻意的设计——避免 Source 和 Poll 之间的循环引用。
blanket implementation
impl<T: Source + ?Sized> Source for Box<T> {
// 代理到内部类型
}
这意味着 Box<dyn Source> 也是一个 Source,支持动态分发。
Waker:跨线程唤醒
pub struct Waker {
inner: sys::Waker,
}
Waker 解决一个具体问题:线程 A 在 poll.poll() 中阻塞等待 I/O 事件,线程 B 想通知线程 A「有新任务要处理」,但没有 I/O 事件可以触发。
使用方式
let waker = Waker::new(poll.registry(), Token(999))?;
// 在另一个线程中
let waker = Arc::new(waker);
let waker_clone = waker.clone();
thread::spawn(move || {
// 做一些工作...
waker_clone.wake().unwrap(); // 唤醒 poll 线程
});
// poll 线程会收到 Token(999) 的 readable 事件
平台实现
| 平台 | 机制 |
|---|---|
| Linux | eventfd — 向 eventfd 写入 1u64,epoll 检测到 fd 可读 |
| macOS/BSD | EVFILT_USER — kqueue 的用户空间事件过滤器 |
| Windows | 通过 IOCP 的 PostQueuedCompletionStatus |
约束
- 每个 Poll 只能有一个 Waker:多个 Waker 注册到同一个 Poll 是未定义行为。debug 模式下 mio 会用
AtomicBool检测并 panic。 - 需要用 Arc 共享:
Waker是Send + Sync的,包在Arc里跨线程使用。 - wake() 可能合并:多次
wake()可能只产生一个事件。这是安全的——调用者看到 waker 事件后应该检查所有待处理的工作,不能假设 wake 和工作是 1:1 的。 - Waker 必须保持存活:Waker 被 drop 后不再保证事件投递。
完整的类型关系图
Poll ──拥有──→ Registry ──持有──→ sys::Selector (epoll fd / kqueue fd / IOCP handle)
│ │
│ poll() │ register/reregister/deregister
▼ ▼
Events Source trait
│ │
│ iter() │ 实现者: TcpListener, TcpStream, UdpSocket, ...
▼ │
Event │ 注册时提供 Token + Interest
│ │
│ token() │
▼ ▼
Token ◄────── Interest
(usize) (NonZeroU8 bitflags)