跳转到正文
zeno's blog
返回

mio(六):代码示例-TCP Echo Server

专题: mio

Table of contents

Open Table of contents

TL;DR

mio 官方提供的 TCP server 示例约 130 行,展示了手动事件循环的完整模式:Token 分发、WouldBlock 处理、edge-triggered drain、连接池管理。同样的功能用 tokio 约 15 行。这个对比不是在批评 mio——它精确地展示了 mio 的定位:它是操作系统事件通知的薄封装,不是应用框架。


mio 版本:完整的 TCP Server

这是 mio 仓库官方示例(examples/tcp_server.rs),带逐段注解。

// 运行:cargo run --example tcp_server --features="os-poll net"
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Registry, Token};
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::str::from_utf8;

// ---- Token 分配策略 ----
// SERVER token 固定为 0,后续每个新连接递增
const SERVER: Token = Token(0);

// 服务器响应内容
const DATA: &[u8] = b"Hello world!\n";

fn main() -> io::Result<()> {
    // ---- 第一步:创建 Poll 和 Events ----
    let mut poll = Poll::new()?;
    let mut events = Events::with_capacity(128);
    // Events 在循环外创建、反复复用。128 是每次 poll 返回的最大事件数。

    // ---- 第二步:创建 TcpListener 并注册 ----
    let addr = "127.0.0.1:9000".parse().unwrap();
    let mut server = TcpListener::bind(addr)?;
    // bind() 内部已经设置了 non-blocking 和 SO_REUSEADDR (Unix)

    poll.registry()
        .register(&mut server, SERVER, Interest::READABLE)?;
    // 只注册 READABLE —— listener 只需要知道"有新连接可以 accept"

    // ---- 连接池:Token → TcpStream 的映射 ----
    let mut connections = HashMap::new();
    let mut unique_token = Token(SERVER.0 + 1);
    // 这里用 HashMap。生产代码推荐用 slab crate 做 O(1) 查找。

    // ---- 第三步:事件循环 ----
    loop {
        // 阻塞等待事件。None = 无限等待。
        // poll() 内部会先 clear events。
        if let Err(err) = poll.poll(&mut events, None) {
            if interrupted(&err) {
                continue; // EINTR:被信号中断,重试
            }
            return Err(err);
        }

        // 遍历所有就绪事件
        for event in events.iter() {
            match event.token() {
                // ---- 监听器事件:有新连接到达 ----
                SERVER => loop {
                    // 关键:edge-triggered 要求 drain 所有 pending 连接
                    // 不能只 accept 一次!
                    let (mut connection, address) = match server.accept() {
                        Ok((connection, address)) => (connection, address),
                        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                            // 所有 pending 连接都已 accept → 退出内层循环
                            break;
                        }
                        Err(e) => return Err(e),
                    };

                    println!("Accepted connection from: {address}");

                    // 为新连接分配 Token
                    let token = next(&mut unique_token);

                    // 注册新连接:关心读和写
                    poll.registry().register(
                        &mut connection,
                        token,
                        Interest::READABLE.add(Interest::WRITABLE),
                    )?;

                    // 存入连接池
                    connections.insert(token, connection);
                },

                // ---- 已有连接的事件 ----
                token => {
                    let done = if let Some(connection) = connections.get_mut(&token) {
                        handle_connection_event(poll.registry(), connection, event)?
                    } else {
                        // Token 不在连接池中 — 可能是已经被移除的连接的迟到事件
                        false
                    };

                    if done {
                        // 连接结束:deregister 并从连接池移除
                        if let Some(mut connection) = connections.remove(&token) {
                            poll.registry().deregister(&mut connection)?;
                        }
                    }
                }
            }
        }
    }
}

// 简单的递增 Token 分配器
fn next(current: &mut Token) -> Token {
    let next = current.0;
    current.0 += 1;
    Token(next)
}

// ---- 处理单个连接的事件 ----
fn handle_connection_event(
    registry: &Registry,
    connection: &mut TcpStream,
    event: &Event,
) -> io::Result<bool> {
    // 先处理写就绪
    if event.is_writable() {
        match connection.write(DATA) {
            Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
            Ok(_) => {
                // 写完欢迎消息后,只关心读事件
                registry.reregister(connection, event.token(), Interest::READABLE)?;
                // 注意:reregister 完全覆盖 — WRITABLE 不再被监听
            }
            Err(ref err) if would_block(err) => {}
            Err(ref err) if interrupted(err) => {
                // EINTR:递归重试
                return handle_connection_event(registry, connection, event);
            }
            Err(err) => return Err(err),
        }
    }

    // 再处理读就绪
    if event.is_readable() {
        let mut connection_closed = false;
        let mut received_data = vec![0; 4096];
        let mut bytes_read = 0;

        // 关键:edge-triggered drain 循环
        // 必须读完所有数据,直到 WouldBlock
        loop {
            match connection.read(&mut received_data[bytes_read..]) {
                Ok(0) => {
                    // read() 返回 0 = 对端关闭连接 (FIN)
                    connection_closed = true;
                    break;
                }
                Ok(n) => {
                    bytes_read += n;
                    if bytes_read == received_data.len() {
                        // buffer 满了,扩展
                        received_data.resize(received_data.len() + 1024, 0);
                    }
                }
                Err(ref err) if would_block(err) => break, // 数据读完了
                Err(ref err) if interrupted(err) => continue, // EINTR:重试
                Err(err) => return Err(err),
            }
        }

        if bytes_read != 0 {
            let received_data = &received_data[..bytes_read];
            if let Ok(str_buf) = from_utf8(received_data) {
                println!("Received data: {}", str_buf.trim_end());
            } else {
                println!("Received (none UTF-8) data: {received_data:?}");
            }
        }

        if connection_closed {
            println!("Connection closed");
            return Ok(true); // 通知调用者移除连接
        }
    }

    Ok(false)
}

fn would_block(err: &io::Error) -> bool {
    err.kind() == io::ErrorKind::WouldBlock
}

fn interrupted(err: &io::Error) -> bool {
    err.kind() == io::ErrorKind::Interrupted
}

这段代码展示的 mio 核心模式

模式代码位置含义
Token 分发match event.token()根据整数 Token 路由事件
Edge-triggered drainSERVER => loop { ... accept ... WouldBlock => break }必须循环处理直到 WouldBlock
WouldBlock 处理Err(e) if e.kind() == WouldBlock => break非阻塞 I/O 的核心错误处理
EINTR 处理Err(ref err) if interrupted(err) => continue系统调用被信号中断
reregister 切换 interestregistry.reregister(..., Interest::READABLE)?写完数据后只关心读
手动连接池HashMap<Token, TcpStream>用户负责管理连接生命周期
手动 deregisterpoll.registry().deregister(&mut connection)?连接关闭时显式注销

tokio 版本:同样功能

use tokio::net::TcpListener;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:9000").await?;
    println!("Listening on 127.0.0.1:9000");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("Accepted connection from: {addr}");

        // 每个连接 spawn 一个轻量级 task
        tokio::spawn(async move {
            // 先写欢迎消息
            if let Err(e) = socket.write_all(b"Hello world!\n").await {
                eprintln!("Write error: {e}");
                return;
            }

            // 持续读取并打印
            let mut buf = vec![0; 4096];
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => {
                        println!("Connection closed");
                        return;
                    }
                    Ok(n) => {
                        let data = &buf[..n];
                        if let Ok(s) = std::str::from_utf8(data) {
                            println!("Received data: {}", s.trim_end());
                        }
                    }
                    Err(e) => {
                        eprintln!("Read error: {e}");
                        return;
                    }
                }
            }
        });
    }
}

tokio 版本隐藏了什么

mio 中手动做的事tokio 怎么处理的
Poll::new() + Events::with_capacity()tokio runtime 内部创建
registry.register() / deregister()tokio 的 I/O driver 自动管理
Token 分配tokio 内部用 ScheduledIo 管理
loop { poll.poll() ... }tokio runtime 的 worker thread 运行
match event.token()tokio 把事件路由到对应的 Waker,唤醒 async task
WouldBlock 处理AsyncRead/AsyncWrite trait 在底层处理
edge-triggered draintokio 的 poll_read 实现处理
HashMap<Token, TcpStream>tokio::spawn 每个 task 拥有自己的连接
EINTR 处理tokio 内部重试

代码量对比

指标miotokio
核心逻辑行数~130~30
错误处理代码占比~40%~10%
需要理解的概念Poll, Token, Interest, Events, Source, edge-triggered, WouldBlockTcpListener, spawn, async/await

C++ Asio 版本:作为参考

#include <asio.hpp>
#include <iostream>
#include <memory>

using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
    tcp::socket socket_;
    char data_[4096];
public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start() {
        // 先发欢迎消息
        auto self = shared_from_this();
        asio::async_write(socket_, asio::buffer("Hello world!\n"),
            [self](std::error_code ec, std::size_t) {
                if (!ec) self->do_read();
            });
    }
private:
    void do_read() {
        auto self = shared_from_this();
        socket_.async_read_some(asio::buffer(data_),
            [self](std::error_code ec, std::size_t length) {
                if (!ec) {
                    std::cout << "Received: "
                        << std::string(self->data_, length) << std::endl;
                    self->do_read(); // 继续读
                }
            });
    }
};

class server {
    tcp::acceptor acceptor_;
public:
    server(asio::io_context& io, short port)
        : acceptor_(io, tcp::endpoint(tcp::v4(), port)) {
        do_accept();
    }
private:
    void do_accept() {
        acceptor_.async_accept(
            [this](std::error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<session>(std::move(socket))->start();
                }
                do_accept(); // 继续 accept
            });
    }
};

int main() {
    asio::io_context io;
    server s(io, 9000);
    io.run();
}

三种实现的对比

维度mio (Rust)tokio (Rust)Asio (C++)
模式Reactor, 手动事件循环Reactor + async/awaitProactor, 回调链
事件分发Token match自动(runtime)Handler callback
内存管理手动 HashMapper-task ownershipshared_ptr
并发模型单线程事件循环M:N 线程池io_context::run()
生命周期手动async/await 自动enable_shared_from_this
代码量最多最少中等
抽象级别最低最高中等

分享这篇文章:

上一篇
mio(七):陷阱与常见错误
下一篇
整洁架构(四):微服务时代的落地-从理想到妥协