Table of contents
Open Table of contents
TL;DR
mio 官方提供的 TCP server 示例约 130 行,展示了手动事件循环的完整模式:Token 分发、WouldBlock 处理、edge-triggered drain、连接池管理。同样的功能用 tokio 约 15 行。这个对比不是在批评 mio——它精确地展示了 mio 的定位:它是操作系统事件通知的薄封装,不是应用框架。
mio 版本:完整的 TCP Server
这是 mio 仓库官方示例(examples/tcp_server.rs),带逐段注解。
// 运行:cargo run --example tcp_server --features="os-poll net"
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Registry, Token};
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::str::from_utf8;
// ---- Token 分配策略 ----
// SERVER token 固定为 0,后续每个新连接递增
const SERVER: Token = Token(0);
// 服务器响应内容
const DATA: &[u8] = b"Hello world!\n";
fn main() -> io::Result<()> {
// ---- 第一步:创建 Poll 和 Events ----
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
// Events 在循环外创建、反复复用。128 是每次 poll 返回的最大事件数。
// ---- 第二步:创建 TcpListener 并注册 ----
let addr = "127.0.0.1:9000".parse().unwrap();
let mut server = TcpListener::bind(addr)?;
// bind() 内部已经设置了 non-blocking 和 SO_REUSEADDR (Unix)
poll.registry()
.register(&mut server, SERVER, Interest::READABLE)?;
// 只注册 READABLE —— listener 只需要知道"有新连接可以 accept"
// ---- 连接池:Token → TcpStream 的映射 ----
let mut connections = HashMap::new();
let mut unique_token = Token(SERVER.0 + 1);
// 这里用 HashMap。生产代码推荐用 slab crate 做 O(1) 查找。
// ---- 第三步:事件循环 ----
loop {
// 阻塞等待事件。None = 无限等待。
// poll() 内部会先 clear events。
if let Err(err) = poll.poll(&mut events, None) {
if interrupted(&err) {
continue; // EINTR:被信号中断,重试
}
return Err(err);
}
// 遍历所有就绪事件
for event in events.iter() {
match event.token() {
// ---- 监听器事件:有新连接到达 ----
SERVER => loop {
// 关键:edge-triggered 要求 drain 所有 pending 连接
// 不能只 accept 一次!
let (mut connection, address) = match server.accept() {
Ok((connection, address)) => (connection, address),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 所有 pending 连接都已 accept → 退出内层循环
break;
}
Err(e) => return Err(e),
};
println!("Accepted connection from: {address}");
// 为新连接分配 Token
let token = next(&mut unique_token);
// 注册新连接:关心读和写
poll.registry().register(
&mut connection,
token,
Interest::READABLE.add(Interest::WRITABLE),
)?;
// 存入连接池
connections.insert(token, connection);
},
// ---- 已有连接的事件 ----
token => {
let done = if let Some(connection) = connections.get_mut(&token) {
handle_connection_event(poll.registry(), connection, event)?
} else {
// Token 不在连接池中 — 可能是已经被移除的连接的迟到事件
false
};
if done {
// 连接结束:deregister 并从连接池移除
if let Some(mut connection) = connections.remove(&token) {
poll.registry().deregister(&mut connection)?;
}
}
}
}
}
}
}
// 简单的递增 Token 分配器
fn next(current: &mut Token) -> Token {
let next = current.0;
current.0 += 1;
Token(next)
}
// ---- 处理单个连接的事件 ----
fn handle_connection_event(
registry: &Registry,
connection: &mut TcpStream,
event: &Event,
) -> io::Result<bool> {
// 先处理写就绪
if event.is_writable() {
match connection.write(DATA) {
Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
Ok(_) => {
// 写完欢迎消息后,只关心读事件
registry.reregister(connection, event.token(), Interest::READABLE)?;
// 注意:reregister 完全覆盖 — WRITABLE 不再被监听
}
Err(ref err) if would_block(err) => {}
Err(ref err) if interrupted(err) => {
// EINTR:递归重试
return handle_connection_event(registry, connection, event);
}
Err(err) => return Err(err),
}
}
// 再处理读就绪
if event.is_readable() {
let mut connection_closed = false;
let mut received_data = vec![0; 4096];
let mut bytes_read = 0;
// 关键:edge-triggered drain 循环
// 必须读完所有数据,直到 WouldBlock
loop {
match connection.read(&mut received_data[bytes_read..]) {
Ok(0) => {
// read() 返回 0 = 对端关闭连接 (FIN)
connection_closed = true;
break;
}
Ok(n) => {
bytes_read += n;
if bytes_read == received_data.len() {
// buffer 满了,扩展
received_data.resize(received_data.len() + 1024, 0);
}
}
Err(ref err) if would_block(err) => break, // 数据读完了
Err(ref err) if interrupted(err) => continue, // EINTR:重试
Err(err) => return Err(err),
}
}
if bytes_read != 0 {
let received_data = &received_data[..bytes_read];
if let Ok(str_buf) = from_utf8(received_data) {
println!("Received data: {}", str_buf.trim_end());
} else {
println!("Received (none UTF-8) data: {received_data:?}");
}
}
if connection_closed {
println!("Connection closed");
return Ok(true); // 通知调用者移除连接
}
}
Ok(false)
}
fn would_block(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::WouldBlock
}
fn interrupted(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::Interrupted
}
这段代码展示的 mio 核心模式
| 模式 | 代码位置 | 含义 |
|---|
| Token 分发 | match event.token() | 根据整数 Token 路由事件 |
| Edge-triggered drain | SERVER => loop { ... accept ... WouldBlock => break } | 必须循环处理直到 WouldBlock |
| WouldBlock 处理 | Err(e) if e.kind() == WouldBlock => break | 非阻塞 I/O 的核心错误处理 |
| EINTR 处理 | Err(ref err) if interrupted(err) => continue | 系统调用被信号中断 |
| reregister 切换 interest | registry.reregister(..., Interest::READABLE)? | 写完数据后只关心读 |
| 手动连接池 | HashMap<Token, TcpStream> | 用户负责管理连接生命周期 |
| 手动 deregister | poll.registry().deregister(&mut connection)? | 连接关闭时显式注销 |
tokio 版本:同样功能
use tokio::net::TcpListener;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:9000").await?;
println!("Listening on 127.0.0.1:9000");
loop {
let (mut socket, addr) = listener.accept().await?;
println!("Accepted connection from: {addr}");
// 每个连接 spawn 一个轻量级 task
tokio::spawn(async move {
// 先写欢迎消息
if let Err(e) = socket.write_all(b"Hello world!\n").await {
eprintln!("Write error: {e}");
return;
}
// 持续读取并打印
let mut buf = vec![0; 4096];
loop {
match socket.read(&mut buf).await {
Ok(0) => {
println!("Connection closed");
return;
}
Ok(n) => {
let data = &buf[..n];
if let Ok(s) = std::str::from_utf8(data) {
println!("Received data: {}", s.trim_end());
}
}
Err(e) => {
eprintln!("Read error: {e}");
return;
}
}
}
});
}
}
tokio 版本隐藏了什么
| mio 中手动做的事 | tokio 怎么处理的 |
|---|
Poll::new() + Events::with_capacity() | tokio runtime 内部创建 |
registry.register() / deregister() | tokio 的 I/O driver 自动管理 |
| Token 分配 | tokio 内部用 ScheduledIo 管理 |
loop { poll.poll() ... } | tokio runtime 的 worker thread 运行 |
match event.token() | tokio 把事件路由到对应的 Waker,唤醒 async task |
| WouldBlock 处理 | AsyncRead/AsyncWrite trait 在底层处理 |
| edge-triggered drain | tokio 的 poll_read 实现处理 |
HashMap<Token, TcpStream> | tokio::spawn 每个 task 拥有自己的连接 |
| EINTR 处理 | tokio 内部重试 |
代码量对比
| 指标 | mio | tokio |
|---|
| 核心逻辑行数 | ~130 | ~30 |
| 错误处理代码占比 | ~40% | ~10% |
| 需要理解的概念 | Poll, Token, Interest, Events, Source, edge-triggered, WouldBlock | TcpListener, spawn, async/await |
C++ Asio 版本:作为参考
#include <asio.hpp>
#include <iostream>
#include <memory>
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
tcp::socket socket_;
char data_[4096];
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
// 先发欢迎消息
auto self = shared_from_this();
asio::async_write(socket_, asio::buffer("Hello world!\n"),
[self](std::error_code ec, std::size_t) {
if (!ec) self->do_read();
});
}
private:
void do_read() {
auto self = shared_from_this();
socket_.async_read_some(asio::buffer(data_),
[self](std::error_code ec, std::size_t length) {
if (!ec) {
std::cout << "Received: "
<< std::string(self->data_, length) << std::endl;
self->do_read(); // 继续读
}
});
}
};
class server {
tcp::acceptor acceptor_;
public:
server(asio::io_context& io, short port)
: acceptor_(io, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](std::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<session>(std::move(socket))->start();
}
do_accept(); // 继续 accept
});
}
};
int main() {
asio::io_context io;
server s(io, 9000);
io.run();
}
三种实现的对比
| 维度 | mio (Rust) | tokio (Rust) | Asio (C++) |
|---|
| 模式 | Reactor, 手动事件循环 | Reactor + async/await | Proactor, 回调链 |
| 事件分发 | Token match | 自动(runtime) | Handler callback |
| 内存管理 | 手动 HashMap | per-task ownership | shared_ptr |
| 并发模型 | 单线程事件循环 | M:N 线程池 | io_context::run() |
| 生命周期 | 手动 | async/await 自动 | enable_shared_from_this |
| 代码量 | 最多 | 最少 | 中等 |
| 抽象级别 | 最低 | 最高 | 中等 |