跳转到正文
zeno's blog
返回

asio(三):组件拆分-strand、timer、socket、buffer如何协作

专题: Asio

Table of contents

Open Table of contents

TL;DR

Asio 的组件设计遵循一个原则:I/O 对象不拥有资源(缓冲区、线程),只持有操作系统句柄和执行器引用。strand 解决多线程序列化问题,timer 集成进 reactor 的等待超时,socket 分层抽象协议差异,buffer 是纯粹的指针+长度对。这些组件通过共享同一个 io_context 和执行器模型自然组合。


组件全景:谁负责什么

┌─────────────────────────────────────────────────────┐
│                    io_context                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────────────┐   │
│  │ reactor  │  │ timer    │  │ completion queue │   │
│  │ (epoll/  │  │ queue    │  │ (handler 队列)    │   │
│  │  kqueue/ │  │ (堆排序) │  │                  │   │
│  │  IOCP)   │  │          │  │                  │   │
│  └────┬─────┘  └────┬─────┘  └────────┬─────────┘   │
│       │              │                 │             │
│  监视 socket fd   管理 timer 超时    分发 handler     │
└───────┼──────────────┼─────────────────┼─────────────┘
        │              │                 │
   ┌────┴────┐   ┌─────┴─────┐   ┌──────┴──────┐
   │ socket  │   │  timer    │   │   strand    │
   │ (fd +   │   │ (到期时间 │   │ (序列化    │
   │  executor)  │  + handler)│   │  执行器)    │
   └────┬────┘   └───────────┘   └─────────────┘

   ┌────┴────┐
   │ buffer  │
   │ (ptr +  │
   │  size)  │
   └─────────┘

strand:无锁串行化,比互斥锁更聪明

问题

当多个线程调用 io_context::run() 时,handler 可能被任意线程执行。如果两个 handler 操作同一个 socket 或共享数据结构,就有竞争条件。

传统方案是加互斥锁,但锁有问题:

strand 的解决方案

strand 保证通过它分发的 handler 永远不会并发执行,但不使用互斥锁:

asio::strand<asio::io_context::executor_type> strand(io.get_executor());

// 这两个 handler 永远不会同时执行,即使有多线程
asio::post(strand, [&] { /* handler A */ });
asio::post(strand, [&] { /* handler B */ });

内部实现

strand 的核心是一个受保护的队列 + 一个 running 标志:

strand 内部状态:
┌──────────────────────┐
│ running: bool (原子)  │
│ queue: [handler...]  │
└──────────────────────┘

post(handler):
  1. 将 handler 加入 queue
  2. 原子检查 running:
     ├─ running == false → 设为 true,开始执行队列
     └─ running == true  → 什么都不做(当前持有者会处理)

执行循环(持有者线程):
  while (queue 非空) {
      handler = queue.dequeue()
      执行 handler
  }
  running = false
  // 再检查一次 queue(防止 ABA 问题)

关键洞察:strand 不是锁——它是一个串行化的任务队列。没有线程被阻塞等待,工作是被排队而非被阻塞。

dispatch vs post

方法行为
post(strand, handler)总是入队,不会立即执行
dispatch(strand, handler)如果当前线程已经在这个 strand 上执行,立即内联执行;否则等同于 post

dispatch 的内联执行在减少延迟上有优势,但要小心重入问题。

两种形式

// 旧形式:绑定到 io_context
asio::io_context::strand strand(io);

// 新形式:模板化,可包装任何执行器
asio::strand<asio::any_io_executor> strand(socket.get_executor());

新形式更灵活,推荐使用。

timer:嵌入 reactor 等待机制的定时器

工作原理

timer 不是独立的机制——它嵌入在 reactor 的等待调用中

reactor 等待逻辑:

1. 查看 timer_queue(最小堆),找到最早到期的 timer
2. 用这个到期时间作为 epoll_wait/kevent 的超时参数
3. epoll_wait 返回时:
   ├─ 有 I/O 事件 → 处理 I/O
   └─ 超时 → 从 timer_queue 取出到期的 timer,执行其 handler

detail::timer_queue 是一个基于 std::vector 的最小堆(以到期时间为 key),关键方法:

steady_timer vs deadline_timer

类型时钟特性
steady_timerstd::chrono::steady_clock单调递增,不受系统时间调整影响
deadline_timerBoost.DateTime ptime已废弃,受 NTP 等时间调整影响

永远用 steady_timerdeadline_timer 在系统时间被调整(NTP 同步、手动修改)时会出问题。

使用示例

asio::steady_timer timer(io, std::chrono::seconds(5));

timer.async_wait([](asio::error_code ec) {
    if (ec == asio::error::operation_aborted) {
        // timer 被取消了
        return;
    }
    // 5 秒到了
});

// 取消 timer(handler 会被调用,ec 为 operation_aborted)
timer.cancel();

// 重设到期时间(取消当前等待,重新开始)
timer.expires_after(std::chrono::seconds(10));

socket:分层协议抽象

层次结构

basic_socket<Protocol>                    ← 通用 socket 操作(open, close, bind, option)
  ├── basic_stream_socket<Protocol>       ← 面向流的操作(async_read_some, async_write_some)
  │     └── tcp::socket                   ← basic_stream_socket<tcp>
  └── basic_datagram_socket<Protocol>     ← 面向数据报的操作(async_send_to, async_receive_from)
        └── udp::socket                   ← basic_datagram_socket<udp>

basic_socket_acceptor<Protocol>           ← 监听和接受连接
  └── tcp::acceptor                       ← basic_socket_acceptor<tcp>

每个 socket 持有:

协议类型的角色

tcpudp 不是类,是协议描述符——它们定义了 socket 类型、地址族、endpoint 类型等:

// tcp 的部分定义
class tcp {
public:
    using endpoint = basic_endpoint<tcp>;
    using socket = basic_stream_socket<tcp>;
    using acceptor = basic_socket_acceptor<tcp>;
    using resolver = basic_resolver<tcp>;

    static tcp v4() { return tcp(AF_INET); }
    static tcp v6() { return tcp(AF_INET6); }

    int type() const { return SOCK_STREAM; }
    int protocol() const { return IPPROTO_TCP; }
    int family() const { return family_; }
};

这种设计让 Asio 可以支持自定义协议——只要实现相同的接口。

buffer:非拥有式设计的代价与收益

核心设计

mutable_bufferconst_buffer 是极简的值类型:

class mutable_buffer {
    void* data_;
    std::size_t size_;
};

class const_buffer {
    const void* data_;
    std::size_t size_;
};

不拥有任何内存。它们只是对现有内存的引用。

buffer() 工厂函数

asio::buffer() 是重载函数,从各种容器创建 buffer:

// 从原始指针 + 大小
char data[1024];
auto buf = asio::buffer(data, sizeof(data));

// 从 std::vector(使用 data() 和 size())
std::vector<char> vec(1024);
auto buf = asio::buffer(vec);

// 从 std::string(只读)
std::string str = "hello";
auto buf = asio::buffer(str);  // const_buffer

// 从 std::array
std::array<char, 256> arr;
auto buf = asio::buffer(arr);

注意asio::buffer(vector) 使用的是 vector.size()不是 vector.capacity()。如果 vector 为空,buffer 大小为 0。

scatter-gather I/O

Buffer sequence(MutableBufferSequence/ConstBufferSequence)支持分散读/聚集写:

std::array<asio::mutable_buffer, 3> bufs = {
    asio::buffer(header, header_size),
    asio::buffer(body, body_size),
    asio::buffer(trailer, trailer_size)
};

// 一次调用写三块不连续的内存
async_write(socket, bufs, handler);

底层映射到 writev()/readv() 系统调用,避免先拷贝到连续缓冲区的开销。

dynamic_buffer:可增长的缓冲区

basic_streambuf(继承自 std::streambuf)提供可增长的缓冲区语义:

asio::streambuf buf;

// 读取直到遇到 '\n'
async_read_until(socket, buf, '\n', [&buf](auto ec, auto n) {
    std::istream is(&buf);
    std::string line;
    std::getline(is, line);
    buf.consume(n);  // 消费已处理的数据
});

操作方法:

缓冲区调试

定义 BOOST_ASIO_ENABLE_BUFFER_DEBUGGING 可以检测缓冲区失效后的使用(use-after-invalidation)。

resolver:异步 DNS 解析

asio::ip::tcp::resolver resolver(io);

resolver.async_resolve("example.com", "https",
    [](asio::error_code ec, asio::ip::tcp::resolver::results_type results) {
        for (const auto& entry : results) {
            // entry.endpoint() 可能是多个 IP 地址
        }
    });

关键行为:

ssl::stream:TLS 如何套在 socket 上

ssl::stream<Stream> 使用 BIO pair 策略将 SSL 协议逻辑与传输解耦:

┌─────────────────────────────────────┐
│           ssl::stream               │
│                                     │
│  ┌──────────┐    ┌───────────────┐  │
│  │ OpenSSL  │←──→│  BIO pair     │  │
│  │ SSL*     │    │  (internal    │  │
│  │ handle   │    │   buffer)     │  │
│  └──────────┘    └───────┬───────┘  │
│                          │          │
│                  ┌───────┴───────┐  │
│                  │ tcp::socket   │  │
│                  │ (底层传输)     │  │
│                  └───────────────┘  │
└─────────────────────────────────────┘
asio::ssl::context ctx(asio::ssl::context::tlsv13_client);
ctx.load_verify_file("ca.pem");

asio::ssl::stream<asio::ip::tcp::socket> ssl_socket(io, ctx);

// 先 TCP 连接
ssl_socket.lowest_layer().async_connect(endpoint, [&](auto ec) {
    // 再 TLS 握手
    ssl_socket.async_handshake(asio::ssl::stream_base::client, [&](auto ec) {
        // 之后就像普通 socket 一样读写
        async_read(ssl_socket, buffer, handler);
    });
});

握手后,ssl::stream 可以像普通 stream 一样使用 async_readasync_write

组件如何组合:一个完整的连接生命周期

// 所有组件共享同一个 io_context
asio::io_context io;
auto strand = asio::make_strand(io);

// 1. DNS 解析
asio::ip::tcp::resolver resolver(strand);
auto results = co_await resolver.async_resolve("example.com", "443",
    asio::use_awaitable);

// 2. TCP 连接
asio::ssl::stream<asio::ip::tcp::socket> socket(strand, ssl_ctx);
co_await asio::async_connect(socket.lowest_layer(),
    results, asio::use_awaitable);

// 3. TLS 握手
co_await socket.async_handshake(
    asio::ssl::stream_base::client, asio::use_awaitable);

// 4. 读写数据(strand 保证序列化)
std::array<char, 4096> buf;
auto n = co_await socket.async_read_some(
    asio::buffer(buf), asio::use_awaitable);

// 5. 超时控制
asio::steady_timer timer(strand, std::chrono::seconds(30));
timer.async_wait([&socket](auto ec) {
    if (!ec) socket.lowest_layer().close();  // 超时关闭连接
});

这里的关键组合点:


分享这篇文章:

上一篇
asio(四):异步模型演进-从回调到C++20协程
下一篇
asio(二):io_context-事件循环的核心引擎