跳转到正文
zeno's blog
返回

Rust 基础:异步编程内部机制

专题: Rust 基础

Table of contents

Open Table of contents

TraitExt 命名约定

TraitExt(Extension Trait)是 Rust 社区的命名约定,用于在不修改原始 trait 的前提下,为已实现该 trait 的类型添加额外方法。

核心场景:

  1. 无法修改原始 trait(在外部 crate 里),但想给所有实现者加便捷方法
  2. 保持原始 trait 精简,把便捷方法放到 Ext trait 里
// 外部 crate 定义的核心 trait
trait Stream {
    fn poll_next(&mut self) -> Option<Item>;
}

// Ext trait — 自动为所有 Stream 实现者提供便捷方法
trait StreamExt: Stream {
    fn next(&mut self) -> Option<Item> {
        self.poll_next()
    }
    fn map<F>(self, f: F) -> Map<Self, F> { ... }
    fn filter<F>(self, f: F) -> Filter<Self, F> { ... }
}

// blanket impl: 任何实现了 Stream 的类型自动获得 Ext 方法
impl<T: Stream> StreamExt for T {}

常见例子:

Result 不是 Trait

Result 是一个 enum,不是 trait:

enum Result<T, E> {
    Ok(T),
    Err(E),
}

它的方法(map, and_then, unwrap, ? 运算符支持等)直接在 impl<T, E> Result<T, E> 上定义。

tokio AsyncReadExt 提供的方法

AsyncReadExt 共提供 29 个 provided methods(blanket impl,无需手动实现),需要 io-util feature。

对比底层的 AsyncRead trait,它只要求实现一个方法:

trait AsyncRead {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>;
}

AsyncReadExt 把它包装成 .await 友好的 async 方法,按功能分四类:

基础读取read, read_buf, read_exact, read_to_end, read_to_string

类型化读取(大端序)read_u8/i8, read_u16/i16, read_u32/i32, read_u64/i64, read_u128/i128, read_f32/f64

类型化读取(小端序):同上所有类型的 _le 变体

组合子/适配器chain(next) 串联两个 reader,take(limit) 限制最多读取字节数

实现者只需写一个 poll_read,使用者自动获得 29 个便捷方法。

Pin 的本质

Pin 不是标记类型,它是一个普通的 wrapper struct:

pub struct Pin<Ptr> {
    pointer: Ptr,  // 私有字段,这是关键
}

它的安全保证不依赖编译器魔法,完全靠 Rust 现有的类型系统实现:

真正和编译器配合的是 Unpin — 它是一个 auto trait,编译器自动为大多数类型实现。async fn 生成的 Future 是 !Unpin(因为内部包含跨 await 的自引用)。

self: Pin<&mut Self> 签名需要编译器的 arbitrary self types 特性支持,不是 Pin 特有的。

Receiver Trait

Receiver 是一个标记 trait(lang item),告诉编译器”这个类型可以用作方法的 self 参数”:

#[lang = "receiver"]
pub trait Receiver {
    type Target: ?Sized;
}

标准库已为常见指针类型实现:&T, &mut T, Box<T>, Rc<T>, Arc<T>, Pin<P: Receiver>

当你写 self: Pin<&mut Self> 时,编译器的推导链:

  1. &mut Self 实现了 Receiver
  2. Pin<P: Receiver> 也实现了 Receiver
  3. 因此 Pin<&mut Self> 是合法的 receiver

Pin 如何保证 Future 不被移动

纯靠 API 设计:move 一个值需要 &mut T,而 Pin<&mut T>!Unpin 类型拒绝给你 &mut T

// 可以拿 &T(只读,无法 move)
impl<T: ?Sized> Deref for Pin<&mut T> {
    type Target = T;
}

// DerefMut 只给 Unpin 类型实现
impl<T: Unpin + ?Sized> DerefMut for Pin<&mut T> {
    // !Unpin 没有这个
}

保证链:

私有字段 → 无法直接取出内部指针

!Unpin 时不实现 DerefMut → 安全代码拿不到 &mut T

没有 &mut T → 无法 move(mem::swap, mem::replace 都做不了)

无法 move → 自引用指针不会失效

Pin 与 Future 中的自引用

async fn 生成的 Future 跨 await 点时会产生自引用:

async fn example() {
    let data = vec![1, 2, 3];
    let r = &data;
    some_async_op().await;
    println!("{}", r);
}

// 编译器生成的状态机:
struct ExampleFuture {
    data: Vec<i32>,
    r: *const Vec<i32>,  // 指向自身的 data 字段
    state: State,
}

如果这个 struct 被 move,data 的地址变了但 r 还指向旧地址 — 悬垂指针。Pin 就是防这个的。

Pin 中访问内部数据

T: Unpin(大多数普通类型):self.get_mut() 直接拿到 &mut Self

T: !Unpin(async 生成的 Future):安全 API 只能拿到只读访问,修改需 unsafe 或用 pin-project crate:

#[pin_project]
struct MyFuture {
    #[pin]
    inner: InnerFut,  // 投影为 Pin<&mut InnerFut>
    count: u32,       // 投影为 &mut u32
}

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
    let this = self.project();
    // this.inner: Pin<&mut InnerFut>
    // this.count: &mut u32
}

Deref 与自动解引用

Rust 的自动解引用通过 Deref trait 实现。当你在值上调用方法或访问字段时,编译器会沿着 Deref 链自动解引用来寻找匹配的方法:

T → &T → &mut T
↓ (如果没找到方法)
Deref 到 U → &U → &mut U
↓ (继续)
...直到找到或链结束

这也是 Pin<&mut T> 设计能成立的原因 — 它实现了 Deref<Target = T>,但对 !Unpin 类型不实现 DerefMut

tokio/mio 底层机制

不是回调模型,是 poll 模型

tokio 和 mio 在 Linux 上基于 epoll,但不是回调模型。以 sock.read(&mut buf).await 为例:

第一次 poll_read:
  ├─ 尝试非阻塞 read() 系统调用
  ├─ 内核返回 EWOULDBLOCK(没数据)
  ├─ epoll_ctl() 注册 fd,存储当前 task 的 Waker
  └─ 返回 Poll::Pending(Future 挂起,buf 作为 Future 状态的一部分存活)

reactor 线程: epoll_wait() 阻塞等待
  ← fd 就绪,epoll_wait 返回
  ← reactor 调用 waker.wake()(只是通知,不读数据)

第二次 poll_read:
  ├─ executor 重新调度这个 task
  ├─ 再次尝试非阻塞 read() ← 数据在这里读到 buf
  └─ 返回 Poll::Ready(Ok(n))

关键:epoll 只负责通知”fd 就绪了”,实际的 read() 系统调用发生在 task 被重新 poll 的时候。

生命周期安全

buf 的所有权在 async 生成的 Future 状态机里。Future 被 Pin 住不会移动,从 Pending 到下次 poll 之间 Future 没有被 drop,所以 buf 的引用始终有效。不存在”回调持有引用”的问题。

修改 epoll 注册不会出 bug

tokio vs Go runtime 尾延迟对比

两者网络在 Linux 上都基于 epoll,调度模型本质相同。tokio 尾延迟更好的原因:

每次调度占用的时间片不同

Go goroutine 两次让出之间可能跑很久(依赖异步抢占 SIGURG,有延迟)。Tokio task 每个 .await 就是让出点,单次 poll() 通常在微秒级就返回。

Go:    |----goroutine A 跑了 500μs----|--B 终于拿到 P--|
Tokio: |-A poll 3μs-|-B poll 2μs-|-C poll 4μs-|-A poll 3μs-|

GC

Go 的 GC 即使很先进,STW 暂停(通常 <1ms)对 p99 是灾难。写屏障增加运行时开销。Rust 没有 GC。

上下文切换成本

Go(有栈协程):切换需要保存/恢复寄存器、切换栈指针,污染 CPU cache。 Tokio(无栈协程):poll 就是一次普通函数调用,没有栈切换,cache 友好。

GoTokio
fd 就绪后要等调度
等待时间长(goroutine 时间片不可控)短(poll 通常微秒级返回)
GC 暂停有,影响全局
协程切换成本高(栈切换)低(函数调用)

定时器实现

核心机制

两者都把 epoll_wait 的 timeout 设为下一个定时器的到期时间:

下一个定时器 500μs 后到期
epoll_wait(timeout = 500μs)
  ├─ 500μs 内有 I/O 事件 → 立即返回,处理 I/O,顺便检查定时器
  └─ 500μs 内没有 I/O    → 超时返回,检查定时器

误差来源

检测延迟(发现定时器到期):

调度延迟(检测到了,等 executor/P 执行):两者都有。

Tokio 的 Timer Wheel

Level 0:  64 slot, 每 slot 1ms   → 覆盖 0-64ms
Level 1:  64 slot, 每 slot 64ms  → 覆盖 64ms-~4s
Level 2:  64 slot, 每 slot ~4s   → 覆盖 ~4s-~4min

插入和检测都是 O(1)。

Go 的定时器

Go 1.14+ 把定时器绑到每个 P 上(之前是全局堆),调度循环中 checkTimers() 检查堆顶。sysmon 线程周期性检查作为兜底。

Future trait 与异步生态

Future 的完整定义

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

pub struct Context<'a> {
    waker: &'a Waker,
}

pub struct Waker {
    waker: RawWaker,
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

整个异步基础设施的标准库部分只有一个方法:poll

编译器:async fn → 状态机

async fn fetch() -> String {
    let conn = connect().await;    // 挂起点 1
    let data = conn.read().await;  // 挂起点 2
    data
}

编译器生成嵌套状态机 enum,每个 await 是一个状态转移。Pending 时把 cx 里的 Waker 留给底层,Ready 时推进到下一个状态。

Waker 是整个体系的枢纽

poll 返回 Pending
  └→ 把 cx.waker() 存到某个地方(epoll slot / timer wheel / channel)

外部事件发生(fd 就绪 / 定时器到期 / channel 有消息)
  └→ 调用 waker.wake()
       └→ 把 task 放回 executor 的 run queue
            └→ executor 重新 poll 这个 task

标准库只定义协议,不提供 runtime

标准库生态
语法async/await(编译器)
协议Future, Poll, Waker, Context
Executortokio, async-std, smol, embassy
Reactormio(epoll/kqueue/iocp)
I/O 类型tokio::net, tokio::fs
组合子FutureExt(futures crate)

这让同一套 async/await 语法可以跑在服务器(tokio)、嵌入式(embassy)、WebAssembly 等完全不同的环境上。

C++ modules 为什么迟迟没普及

C++20 modules 和 Rust 的 crate metadata 思路一样 — 编译出二进制模块接口(BMI),消费者读 metadata 而不是文本包含。但面对的现实更残酷:

Rust vs C++20 协程模型对比

内存分配

C++20 协程帧独立堆分配。Rust 的子 Future 不独立堆分配,而是内联在父 Future 的状态机结构体里(顶层 spawn 的 task 本身在堆上)。

对称转移

C++20 支持对称转移(一个协程直接跳到另一个,栈深度 O(1))。Rust 的 poll 模型天然不支持 — executor 调用主 Future 的 poll,主 Future 调用子 Future 的 poll,是嵌套函数调用。

C++20 对称转移Rust poll 树
协程间切换直接跳转,不回调度器必须返回 executor 再 poll
子协程内存每个独立堆分配内联在父 Future,零额外分配
cache 局部性差(帧分散在堆上)好(一整块连续内存)
取消手动管理 handle 生命周期drop 即取消,递归析构
深层嵌套栈深度 O(1)(对称转移)调用栈随嵌套深度增长

”大 Future”问题

select! 所有分支的 Future 同时持有,enum 的大小是最大 variant 的大小。tokio::spawn 是 Rust 打断嵌套、引入独立调度点的逃生舱口。

深层嵌套与爆栈

顺序 await 多 → 不会爆栈(poll 深度不增长)。嵌套深 → 会爆栈(每层 poll 吃真实栈帧)。递归 async 需要 Box 打断内联,但 poll 栈深度仍然 O(n)。tokio::spawn 可将 poll 栈深度重置为 O(1)。

C# 协程对比

C# 的 async/await(2012 年,先驱)和 Rust 更像 — 都是编译器生成状态机。但通知模型相反:

C#RustC++20
状态机编译器生成编译器生成编译器生成
通知模型push(回调)pull(poll)都可以
状态机内存首次挂起时堆分配(GC 管理)内联在父 Future独立堆分配
子协程内联不内联内联不内联
对称转移不支持不支持支持
取消CancellationTokendrop 即取消手动 destroy

C# 有 SynchronizationContext(continuation 在哪个线程恢复由它决定),Rust 和 C++ 没有这层抽象。C# 不需要 Pin,因为 GC 语言里对象移动时引用会被自动更新。

Trait 可见性与孤儿规则

Rust 的编译单元是 crate。每个 crate 编译后生成 metadata(.rmeta),包含所有类型定义、trait 定义和 trait impl。

errno 到 std::io::Error 的转换

// 1. std 内部调用 libc
let ret = libc::read(fd, buf.as_mut_ptr(), buf.len());
// 2. 检查返回值
if ret == -1 {
    return Err(io::Error::last_os_error());  // 读 errno
}

io::Error 内部表示:

enum Repr {
    Os(i32),              // 直接存 errno 值
    Simple(ErrorKind),
    Custom(Box<Custom>),
}

OS 变体直接存原始 errno 数值,只在调用 .kind() 时才惰性映射到 ErrorKind 枚举。

tokio 与 asio 的层级定位

两者都定位在异步 I/O 运行时层,提供 executor、reactor、timer、基本异步读写,不提供协议解析、路由、序列化等上层功能。

应用框架    axum, actix-web           Boost.Beast (HTTP)
协议解析    hyper (HTTP)              Boost.Beast
TLS        tokio-rustls              asio::ssl
序列化      serde                     nlohmann/json, protobuf
─────────── 分界线 ───────────────────────────────────────
运行时      tokio                     asio
            ├─ executor               ├─ io_context
            ├─ reactor (epoll)        ├─ reactor (epoll/iocp)
            ├─ timer                  ├─ steady_timer
            └─ TcpStream/UdpSocket    └─ tcp::socket/udp::socket

分享这篇文章:

上一篇
系统设计基础(一):C4 Model 如何用四层缩放解决架构图混乱
下一篇
Rust 基础:anyhow 为什么值得单独使用