Table of contents
Open Table of contents
TL;DR
Asio 的组件设计遵循一个原则:I/O 对象不拥有资源(缓冲区、线程),只持有操作系统句柄和执行器引用。strand 解决多线程序列化问题,timer 集成进 reactor 的等待超时,socket 分层抽象协议差异,buffer 是纯粹的指针+长度对。这些组件通过共享同一个 io_context 和执行器模型自然组合。
组件全景:谁负责什么
┌─────────────────────────────────────────────────────┐
│ io_context │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ reactor │ │ timer │ │ completion queue │ │
│ │ (epoll/ │ │ queue │ │ (handler 队列) │ │
│ │ kqueue/ │ │ (堆排序) │ │ │ │
│ │ IOCP) │ │ │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────────┬─────────┘ │
│ │ │ │ │
│ 监视 socket fd 管理 timer 超时 分发 handler │
└───────┼──────────────┼─────────────────┼─────────────┘
│ │ │
┌────┴────┐ ┌─────┴─────┐ ┌──────┴──────┐
│ socket │ │ timer │ │ strand │
│ (fd + │ │ (到期时间 │ │ (序列化 │
│ executor) │ + handler)│ │ 执行器) │
└────┬────┘ └───────────┘ └─────────────┘
│
┌────┴────┐
│ buffer │
│ (ptr + │
│ size) │
└─────────┘
strand:无锁串行化,比互斥锁更聪明
问题
当多个线程调用 io_context::run() 时,handler 可能被任意线程执行。如果两个 handler 操作同一个 socket 或共享数据结构,就有竞争条件。
传统方案是加互斥锁,但锁有问题:
- 阻塞:一个线程持锁时,其他线程等待,浪费 CPU
- 死锁风险:多个锁的获取顺序不一致
- 粒度难把握:太粗影响并发,太细容易遗漏
strand 的解决方案
strand 保证通过它分发的 handler 永远不会并发执行,但不使用互斥锁:
asio::strand<asio::io_context::executor_type> strand(io.get_executor());
// 这两个 handler 永远不会同时执行,即使有多线程
asio::post(strand, [&] { /* handler A */ });
asio::post(strand, [&] { /* handler B */ });
内部实现
strand 的核心是一个受保护的队列 + 一个 running 标志:
strand 内部状态:
┌──────────────────────┐
│ running: bool (原子) │
│ queue: [handler...] │
└──────────────────────┘
post(handler):
1. 将 handler 加入 queue
2. 原子检查 running:
├─ running == false → 设为 true,开始执行队列
└─ running == true → 什么都不做(当前持有者会处理)
执行循环(持有者线程):
while (queue 非空) {
handler = queue.dequeue()
执行 handler
}
running = false
// 再检查一次 queue(防止 ABA 问题)
关键洞察:strand 不是锁——它是一个串行化的任务队列。没有线程被阻塞等待,工作是被排队而非被阻塞。
dispatch vs post
| 方法 | 行为 |
|---|---|
post(strand, handler) | 总是入队,不会立即执行 |
dispatch(strand, handler) | 如果当前线程已经在这个 strand 上执行,立即内联执行;否则等同于 post |
dispatch 的内联执行在减少延迟上有优势,但要小心重入问题。
两种形式
// 旧形式:绑定到 io_context
asio::io_context::strand strand(io);
// 新形式:模板化,可包装任何执行器
asio::strand<asio::any_io_executor> strand(socket.get_executor());
新形式更灵活,推荐使用。
timer:嵌入 reactor 等待机制的定时器
工作原理
timer 不是独立的机制——它嵌入在 reactor 的等待调用中:
reactor 等待逻辑:
1. 查看 timer_queue(最小堆),找到最早到期的 timer
2. 用这个到期时间作为 epoll_wait/kevent 的超时参数
3. epoll_wait 返回时:
├─ 有 I/O 事件 → 处理 I/O
└─ 超时 → 从 timer_queue 取出到期的 timer,执行其 handler
detail::timer_queue 是一个基于 std::vector 的最小堆(以到期时间为 key),关键方法:
enqueue_timer:插入 timer,返回是否成为新的最早到期者get_ready_timers:将所有到期 timer 的 handler 移到执行队列wait_duration_msec:计算 reactor 的等待超时
steady_timer vs deadline_timer
| 类型 | 时钟 | 特性 |
|---|---|---|
steady_timer | std::chrono::steady_clock | 单调递增,不受系统时间调整影响 |
deadline_timer | Boost.DateTime ptime | 已废弃,受 NTP 等时间调整影响 |
永远用 steady_timer。deadline_timer 在系统时间被调整(NTP 同步、手动修改)时会出问题。
使用示例
asio::steady_timer timer(io, std::chrono::seconds(5));
timer.async_wait([](asio::error_code ec) {
if (ec == asio::error::operation_aborted) {
// timer 被取消了
return;
}
// 5 秒到了
});
// 取消 timer(handler 会被调用,ec 为 operation_aborted)
timer.cancel();
// 重设到期时间(取消当前等待,重新开始)
timer.expires_after(std::chrono::seconds(10));
socket:分层协议抽象
层次结构
basic_socket<Protocol> ← 通用 socket 操作(open, close, bind, option)
├── basic_stream_socket<Protocol> ← 面向流的操作(async_read_some, async_write_some)
│ └── tcp::socket ← basic_stream_socket<tcp>
└── basic_datagram_socket<Protocol> ← 面向数据报的操作(async_send_to, async_receive_from)
└── udp::socket ← basic_datagram_socket<udp>
basic_socket_acceptor<Protocol> ← 监听和接受连接
└── tcp::acceptor ← basic_socket_acceptor<tcp>
每个 socket 持有:
- 一个文件描述符 / HANDLE
- 一个执行器引用(决定 handler 在哪里执行)
协议类型的角色
tcp、udp 不是类,是协议描述符——它们定义了 socket 类型、地址族、endpoint 类型等:
// tcp 的部分定义
class tcp {
public:
using endpoint = basic_endpoint<tcp>;
using socket = basic_stream_socket<tcp>;
using acceptor = basic_socket_acceptor<tcp>;
using resolver = basic_resolver<tcp>;
static tcp v4() { return tcp(AF_INET); }
static tcp v6() { return tcp(AF_INET6); }
int type() const { return SOCK_STREAM; }
int protocol() const { return IPPROTO_TCP; }
int family() const { return family_; }
};
这种设计让 Asio 可以支持自定义协议——只要实现相同的接口。
buffer:非拥有式设计的代价与收益
核心设计
mutable_buffer 和 const_buffer 是极简的值类型:
class mutable_buffer {
void* data_;
std::size_t size_;
};
class const_buffer {
const void* data_;
std::size_t size_;
};
不拥有任何内存。它们只是对现有内存的引用。
buffer() 工厂函数
asio::buffer() 是重载函数,从各种容器创建 buffer:
// 从原始指针 + 大小
char data[1024];
auto buf = asio::buffer(data, sizeof(data));
// 从 std::vector(使用 data() 和 size())
std::vector<char> vec(1024);
auto buf = asio::buffer(vec);
// 从 std::string(只读)
std::string str = "hello";
auto buf = asio::buffer(str); // const_buffer
// 从 std::array
std::array<char, 256> arr;
auto buf = asio::buffer(arr);
注意:asio::buffer(vector) 使用的是 vector.size(),不是 vector.capacity()。如果 vector 为空,buffer 大小为 0。
scatter-gather I/O
Buffer sequence(MutableBufferSequence/ConstBufferSequence)支持分散读/聚集写:
std::array<asio::mutable_buffer, 3> bufs = {
asio::buffer(header, header_size),
asio::buffer(body, body_size),
asio::buffer(trailer, trailer_size)
};
// 一次调用写三块不连续的内存
async_write(socket, bufs, handler);
底层映射到 writev()/readv() 系统调用,避免先拷贝到连续缓冲区的开销。
dynamic_buffer:可增长的缓冲区
basic_streambuf(继承自 std::streambuf)提供可增长的缓冲区语义:
asio::streambuf buf;
// 读取直到遇到 '\n'
async_read_until(socket, buf, '\n', [&buf](auto ec, auto n) {
std::istream is(&buf);
std::string line;
std::getline(is, line);
buf.consume(n); // 消费已处理的数据
});
操作方法:
prepare(n):分配 n 字节的写入空间commit(n):将写入的 n 字节标记为可读consume(n):丢弃已读的 n 字节
缓冲区调试
定义 BOOST_ASIO_ENABLE_BUFFER_DEBUGGING 可以检测缓冲区失效后的使用(use-after-invalidation)。
resolver:异步 DNS 解析
asio::ip::tcp::resolver resolver(io);
resolver.async_resolve("example.com", "https",
[](asio::error_code ec, asio::ip::tcp::resolver::results_type results) {
for (const auto& entry : results) {
// entry.endpoint() 可能是多个 IP 地址
}
});
关键行为:
- 无缓存:每次调用都可能触发 DNS 查询,依赖操作系统的 DNS 缓存
- 多地址:一个域名可能解析到多个 IP,返回全部
- 永远用
async_resolve():同步版resolve()在 DNS 超时时会阻塞数秒
ssl::stream:TLS 如何套在 socket 上
ssl::stream<Stream> 使用 BIO pair 策略将 SSL 协议逻辑与传输解耦:
┌─────────────────────────────────────┐
│ ssl::stream │
│ │
│ ┌──────────┐ ┌───────────────┐ │
│ │ OpenSSL │←──→│ BIO pair │ │
│ │ SSL* │ │ (internal │ │
│ │ handle │ │ buffer) │ │
│ └──────────┘ └───────┬───────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ tcp::socket │ │
│ │ (底层传输) │ │
│ └───────────────┘ │
└─────────────────────────────────────┘
asio::ssl::context ctx(asio::ssl::context::tlsv13_client);
ctx.load_verify_file("ca.pem");
asio::ssl::stream<asio::ip::tcp::socket> ssl_socket(io, ctx);
// 先 TCP 连接
ssl_socket.lowest_layer().async_connect(endpoint, [&](auto ec) {
// 再 TLS 握手
ssl_socket.async_handshake(asio::ssl::stream_base::client, [&](auto ec) {
// 之后就像普通 socket 一样读写
async_read(ssl_socket, buffer, handler);
});
});
握手后,ssl::stream 可以像普通 stream 一样使用 async_read、async_write。
组件如何组合:一个完整的连接生命周期
// 所有组件共享同一个 io_context
asio::io_context io;
auto strand = asio::make_strand(io);
// 1. DNS 解析
asio::ip::tcp::resolver resolver(strand);
auto results = co_await resolver.async_resolve("example.com", "443",
asio::use_awaitable);
// 2. TCP 连接
asio::ssl::stream<asio::ip::tcp::socket> socket(strand, ssl_ctx);
co_await asio::async_connect(socket.lowest_layer(),
results, asio::use_awaitable);
// 3. TLS 握手
co_await socket.async_handshake(
asio::ssl::stream_base::client, asio::use_awaitable);
// 4. 读写数据(strand 保证序列化)
std::array<char, 4096> buf;
auto n = co_await socket.async_read_some(
asio::buffer(buf), asio::use_awaitable);
// 5. 超时控制
asio::steady_timer timer(strand, std::chrono::seconds(30));
timer.async_wait([&socket](auto ec) {
if (!ec) socket.lowest_layer().close(); // 超时关闭连接
});
这里的关键组合点:
- strand 包装了执行器,所有组件通过 strand 获取序列化保证
- resolver 返回 endpoint 列表给 socket 连接
- ssl::stream 包装 socket 添加 TLS 层
- buffer 作为 I/O 操作的参数传入
- timer 与 socket 操作配合实现超时