跳转到正文
zeno's blog
返回

mio(二):核心架构-Poll、Token、Interest、Events

专题: mio

Table of contents

Open Table of contents

TL;DR

mio 的核心由 6 个类型组成:Poll(事件循环)、Registry(注册表)、Token(事件标识)、Interest(监听意图)、Events(事件集合)、Source trait(可轮询的 I/O 源)。整个 API 没有一个回调函数、没有一个 closure——全靠整数 Token 做事件分发。这是有意为之的设计决策,为了与 Rust 的 ownership 模型兼容。


架构全景

             ┌────────────────────┐
             │    Event Loop      │
             │    (用户代码)       │
             └─────────┬──────────┘
                       │ poll.poll(&mut events, timeout)

              ┌────────────────┐
              │     Poll       │  ← 包装 OS selector
              │  ┌──────────┐  │
              │  │ Registry │  │  ← 管理 I/O 源注册
              │  └──────────┘  │
              └────────┬───────┘
                       │ epoll_wait / kevent / GetQueuedCompletionStatusEx

              ┌────────────────┐
              │   OS Kernel    │
              └────────────────┘

用户的事件循环是一个 loop:调 poll.poll() → 遍历 events → 根据 event.token() 分发 → 执行 I/O → 回到 poll.poll()

Poll:事件循环的核心引擎

// mio::Poll
pub struct Poll {
    registry: Registry,
}

Poll 是 mio 中最重要的类型。它封装了操作系统的 I/O 多路复用机制(Linux epoll, macOS kqueue, Windows IOCP)。

创建

let mut poll = Poll::new()?;

Poll::new() 在底层调用:

轮询

pub fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>

这是唯一的阻塞点。语义:

  1. 清空 events 集合
  2. 阻塞等待已注册的 I/O 源产生就绪事件,或者超时
  3. 将就绪事件填充到 events
  4. 返回

关键行为:

获取 Registry

pub fn registry(&self) -> &Registry

Poll 拥有唯一的 Registry。通过 registry() 获取引用来注册/注销 I/O 源。Registry 可以通过 try_clone() 复制,复制出来的 Registry 共享同一个底层 selector——这是为了让注册操作可以在其他线程进行。

线程安全

Poll 实现了 Send + Sync。但 poll() 要求 &mut self,所以实际上同一时刻只有一个线程能轮询。Registry 也是 Send + Sync,可以跨线程注册 I/O 源。

生命周期

Poll 被 drop 时,它可能取消已注册事件源的进行中操作。文档原文:

When the Poll instance is dropped it may cancel in-flight operations for the registered event sources.

这意味着你必须保证 Poll 的生命周期覆盖所有注册的事件源。

Registry:注册表

// mio::Registry(内部定义)
pub struct Registry {
    selector: sys::Selector,
    #[cfg(debug_assertions)]
    has_waker: Arc<AtomicBool>,  // debug 模式下检测 Waker 唯一性
}

Registry 负责管理 I/O 源与 Poll 之间的关联关系。三个核心方法:

register()

pub fn register<S: Source + ?Sized>(
    &self,
    source: &mut S,
    token: Token,
    interests: Interest,
) -> io::Result<()>

将一个 I/O 源注册到 Poll,指定 Token 和 Interest。

reregister()

pub fn reregister<S: Source + ?Sized>(
    &self,
    source: &mut S,
    token: Token,
    interests: Interest,
) -> io::Result<()>

修改已注册源的 Token 和 Interest。注意:参数完全覆盖之前的值。如果之前注册了 READABLE | WRITABLE,reregister 只传 READABLE,则 writable 事件不再被监听。

底层调用:Linux epoll_ctl(EPOLL_CTL_MOD),macOS kevent(EV_ADD) 重新添加。

deregister()

pub fn deregister<S: Source + ?Sized>(
    &self,
    source: &mut S,
) -> io::Result<()>

取消注册。调用后保证不会再收到该源的事件。源可以在 deregister 后重新 register 到同一个 Poll。

底层调用:Linux epoll_ctl(EPOLL_CTL_DEL)

try_clone()

pub fn try_clone(&self) -> io::Result<Registry>

复制 Registry,复制出来的和原始的共享同一个底层 selector。用于跨线程注册场景:一个线程 poll,另一个线程通过 cloned Registry 注册新的 I/O 源。

Token:整数标识的事件分发

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Token(pub usize);

Token 就是一个 usize 的 newtype wrapper。没有生成器、没有分配器——你自己决定编号方案。

为什么是整数而不是回调

这是 mio 和 Asio 最核心的设计差异之一。

C++ Asio 的方式:每个异步操作绑定一个 completion handler(回调函数/函数对象)。

socket.async_read_some(buffer, [](error_code ec, size_t n) {
    // 回调:操作完成时调用
});

mio 的方式:注册时给一个整数 Token,事件到达时返回这个 Token,由用户代码做 match 分发。

registry.register(&mut socket, Token(42), Interest::READABLE)?;

// 事件循环中
for event in events.iter() {
    match event.token() {
        Token(42) => { /* 处理 socket */ }
        _ => {}
    }
}

为什么 mio 不用回调?三个原因:

  1. Rust 的 closure 有生命周期:如果把 closure 存起来等事件到达时调用,这个 closure 捕获的所有引用都必须活得足够久。这在 Rust 中极其难处理——你要么用 'static 约束(强制 move 或 Arc),要么用 unsafe。Token 是 Copy 的,没有生命周期问题。

  2. 内存安全不需要堆分配:C++ Asio 的 handler 通常需要堆分配(allocate handler memory)。mio 的 Token 是 usize,0 字节额外开销。

  3. 与 Rust 的 match 语法契合:Rust 的 match 表达式天然适合基于整数的分发。编译器还能检查你是否遗漏了分支。

Token 的编号策略

mio 不替你管 Token 分配。常见做法:

// 方法 1:手动计数器
const SERVER: Token = Token(0);
let mut next_token = 1;
fn alloc_token(counter: &mut usize) -> Token {
    let t = Token(*counter);
    *counter += 1;
    t
}

// 方法 2:用 slab crate(mio 文档推荐)
// slab 是一个基于数组的 O(1) 分配器,正好适合 Token 场景
let mut slab = slab::Slab::new();
let entry = slab.vacant_entry();
let token = Token(entry.key());
entry.insert(my_connection);

mio 文档明确推荐用 slab 而不是 HashMap——因为 slab 基于数组,Token 就是数组索引,O(1) 查找。

Interest:监听意图

pub struct Interest(NonZeroU8);

Interest 表示你对一个 I/O 源关心的事件类型。它用一个 NonZeroU8 的位标志实现。

核心常量

常量位值含义平台可用性
Interest::READABLE0b0_0001可读全平台
Interest::WRITABLE0b0_0010可写全平台
Interest::AIO0b0_0100异步 I/O 完成DragonFly BSD, FreeBSD, macOS
Interest::LIO0b0_1000列表 I/O 完成FreeBSD
Interest::PRIORITY0b1_0000优先级事件Linux, Android

跨平台保证:只有 READABLEWRITABLE 在所有平台上保证可用。AIOLIOPRIORITY 是平台特定的。

组合操作

// const 上下文可用
const INTERESTS: Interest = Interest::READABLE.add(Interest::WRITABLE);

// 运行时也可以用 BitOr
let interests = Interest::READABLE | Interest::WRITABLE;

// 移除某个 interest
let read_only = interests.remove(Interest::WRITABLE); // Some(Interest::READABLE)
let nothing = Interest::READABLE.remove(Interest::READABLE); // None

add()const fn,可以在编译期组合。remove() 返回 Option<Interest> 因为如果移除后为空,NonZeroU8 不允许为 0。

Events:事件集合

pub struct Events {
    inner: sys::Events,
}

Eventspoll() 的输出容器,内部是平台特定的事件数组。

创建

let mut events = Events::with_capacity(1024);

with_capacity 指定最大事件数。这个值影响每次 poll() 返回的最大事件量——OS 可能有更多就绪事件,但 mio 只返回 capacity 个。实践中 128-1024 是常见值。

使用模式

// Events 应该在循环外创建,反复复用
let mut events = Events::with_capacity(128);

loop {
    poll.poll(&mut events, None)?;
    // poll() 内部会先 clear events,再填充
    for event in events.iter() {
        let token = event.token();
        // 分发...
    }
}

方法

方法签名说明
with_capacityfn with_capacity(capacity: usize) -> Events创建指定容量的事件集合
capacityfn capacity(&self) -> usize返回容量
is_emptyfn is_empty(&self) -> bool是否无事件
iterfn iter(&self) -> Iter<'_>事件迭代器
clearfn clear(&mut self)清空(poll() 内部自动调用)

Events 实现了 IntoIterator(引用),可以直接 for event in &events

Event:单个事件

#[repr(transparent)]
pub struct Event {
    inner: sys::Event,  // 平台特定的事件类型
}

Eventpoll() 返回的单个就绪事件,包含一个 Token 和就绪状态标志。

方法

方法返回含义跨平台保证
token()Token事件对应的 Token全平台
is_readable()bool可读就绪全平台
is_writable()bool可写就绪全平台
is_error()bool错误状态提示性,可能漏报
is_read_closed()bool读端关闭(对端 FIN / shutdown)尽力而为,不会误报
is_write_closed()bool写端关闭尽力而为,不会误报
is_priority()bool优先级事件仅 epoll (EPOLLPRI)
is_aio()boolAIO 完成仅 kqueue BSD/macOS
is_lio()boolLIO 完成仅 FreeBSD

关于 is_read_closed / is_write_closed 的「尽力而为」

文档原文:

This is a best-effort implementation — false positives are guaranteed not to occur.

意思是:如果 is_read_closed() 返回 true,读端一定关闭了。但如果返回 false,读端可能已经关闭但 mio 没检测到。所以你不能仅依赖 is_read_closed() 来判断连接是否断开——还要检查 read() 返回 0。

OOB 数据与 is_readable

文档警告:Out-of-band (OOB) 数据也会触发 is_readable() == true。如果你的应用不处理 OOB 数据,一个恶意客户端可以持续发送 OOB 数据造成事件风暴。但因为 mio 使用边缘触发(edge-triggered),不会导致无限循环——只会触发一次。

平台特定的 is_error 实现

平台检测标志
epoll (Linux)EPOLLERR
kqueue (macOS/BSD)EV_ERROR 或 (EV_EOF + fflags != 0)

Source trait:可轮询的 I/O 源

pub trait Source {
    fn register(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()>;

    fn reregister(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()>;

    fn deregister(&mut self, registry: &Registry) -> io::Result<()>;
}

任何实现了 Source 的类型都可以注册到 Poll

内置实现者

mio 自己提供的 Source 实现:

类型模块路径功能
TcpListenermio::net::TcpListenerTCP 监听器
TcpStreammio::net::TcpStreamTCP 连接
UdpSocketmio::net::UdpSocketUDP 套接字
UnixListenermio::net::UnixListenerUnix 域套接字监听器
UnixStreammio::net::UnixStreamUnix 域套接字连接
UnixDatagrammio::net::UnixDatagramUnix 域数据报
SourceFdmio::unix::SourceFd任意文件描述符包装器
Sendermio::unix::pipe::SenderUnix 管道发送端
Receivermio::unix::pipe::ReceiverUnix 管道接收端

自定义 Source

如果你有一个自己的类型底层是 fd/socket,可以这样实现 Source:

use mio::{event::Source, Interest, Registry, Token};
use mio::unix::SourceFd;
use std::os::unix::io::AsRawFd;

struct MyDevice {
    fd: RawFd,
}

impl Source for MyDevice {
    fn register(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        SourceFd(&self.fd).register(registry, token, interests)
    }

    fn reregister(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        SourceFd(&self.fd).reregister(registry, token, interests)
    }

    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
        SourceFd(&self.fd).deregister(registry)
    }
}

关键:SourceFd 不拥有 fd 的所有权——它接受 &RawFd,不会在 drop 时关闭 fd。

资源泄漏警告

Source 文档明确指出:

All event::Sources, unless otherwise specified, need to be deregistered before being dropped for them to not leak resources.

这和 Rust 通常的 RAII 模式不同。为什么不能在 drop() 中自动 deregister?因为 deregister() 需要 &Registry,而 Source 不持有 Registry 的引用。这是刻意的设计——避免 Source 和 Poll 之间的循环引用。

blanket implementation

impl<T: Source + ?Sized> Source for Box<T> {
    // 代理到内部类型
}

这意味着 Box<dyn Source> 也是一个 Source,支持动态分发。

Waker:跨线程唤醒

pub struct Waker {
    inner: sys::Waker,
}

Waker 解决一个具体问题:线程 A 在 poll.poll() 中阻塞等待 I/O 事件,线程 B 想通知线程 A「有新任务要处理」,但没有 I/O 事件可以触发。

使用方式

let waker = Waker::new(poll.registry(), Token(999))?;

// 在另一个线程中
let waker = Arc::new(waker);
let waker_clone = waker.clone();
thread::spawn(move || {
    // 做一些工作...
    waker_clone.wake().unwrap(); // 唤醒 poll 线程
});

// poll 线程会收到 Token(999) 的 readable 事件

平台实现

平台机制
Linuxeventfd — 向 eventfd 写入 1u64,epoll 检测到 fd 可读
macOS/BSDEVFILT_USER — kqueue 的用户空间事件过滤器
Windows通过 IOCP 的 PostQueuedCompletionStatus

约束

完整的类型关系图

Poll ──拥有──→ Registry ──持有──→ sys::Selector (epoll fd / kqueue fd / IOCP handle)
  │                │
  │ poll()         │ register/reregister/deregister
  ▼                ▼
Events         Source trait
  │              │
  │ iter()       │ 实现者: TcpListener, TcpStream, UdpSocket, ...
  ▼              │
Event            │ 注册时提供 Token + Interest
  │              │
  │ token()      │
  ▼              ▼
Token ◄────── Interest
(usize)       (NonZeroU8 bitflags)

分享这篇文章:

上一篇
整洁架构(一):概览-依赖方向与层级职责
下一篇
mio(一):Rust 异步生态的 Reactor 基石