跳转到正文
zeno's blog
返回

C++ 网络编程:epoll、Reactor 与 one loop per thread

专题: Linux I/O

Table of contents

Open Table of contents

TL;DR

C++ 标准库到 C++20 都没有网络库(Networking TS 因 executor 重构被无限期搁置)。Linux 生产方案是 epoll ET + Reactor 模式 + one loop per thread:主 Reactor accept 连接分发到工作线程池,每个工作线程独占 EventLoop 处理自己的连接,通过 eventfd 跨线程唤醒、timerfd 统一定时器、shared_ptr + weak_ptr 管理 TcpConnection 生命周期。muduo 是国内 C++ 服务器开发事实上的学习范本。


1. C++ 网络编程的尴尬现状

C++ 到 C++20 都没有标准网络库。Networking TS(N4370, 2014)以 Boost.Asio 为蓝本,本已进入 C++20 候选,但因与 coroutine、executor 提案冲突被推迟。2020 年后 std::execution(P2300,sender/receiver)被推进为 executor 新基础,Networking TS 无限期搁置,C++23 明确不再包含。2026 年的共识是:等 senders/receivers 稳定(C++26 候选)后再重做网络库

应届生被面试官问到时,正确答案是 “C++ 标准库到 2026 年仍然没有网络库”,顺带解释这段历史能体现对标准演进的关注。

生产方案的四条路

muduo 为什么在国内火:陈硕的《Linux 多线程服务端编程》是中文圈 C++ 服务器开发事实上的入门教材。muduo 代码约 10k 行,设计清晰,国内大厂 C++ 后端岗位几乎默认假设你看过 muduo。


2. Berkeley Socket 基础回顾

int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
bind(fd, (sockaddr*)&addr, sizeof(addr));
listen(fd, SOMAXCONN);
int conn = accept4(fd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
recv(conn, buf, sizeof(buf), 0);
send(conn, buf, n, MSG_NOSIGNAL);
close(conn);

关键细节

errno 分类(决定回调怎么处理返回值):

errno含义处理
EAGAIN / EWOULDBLOCK非阻塞 fd 没数据可读/写缓冲满正常信号,交还控制权
EINTR系统调用被信号打断应该重试
ECONNRESET对端发 RST连接异常
EPIPE向已关闭写端 socket 写入(伴随 SIGPIPE)进程会被 kill
EMFILE进程 fd 达到上限muduo 的 idle fd 技巧

3. I/O 多路复用:select → poll → epoll

3.1 select 的三宗罪(1983, BSD)

  1. FD_SETSIZE 硬编码 1024(glibc 编译期常量)
  2. 每次调用都要把整个 fd_set 从用户态拷贝到内核态
  3. 内核和用户态都要 O(n) 扫描找出就绪 fd
  4. fd_set 被修改,每次循环必须重置

3.2 poll 的改良(1986, System V)

用 pollfd 数组替代位图突破 1024,events / revents 分离不用重置。仍然 O(n) 扫描

3.3 epoll(2002, Linux 2.5.44)

int ep = epoll_create1(EPOLL_CLOEXEC);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = channel_ptr;
epoll_ctl(ep, EPOLL_CTL_ADD, fd, &ev);

struct epoll_event events[MAX];
int n = epoll_wait(ep, events, MAX, -1);

内核数据结构fs/eventpoll.c):

O(1) 来源epoll_wait 只需检查 rdllist 是否为空,不需扫描红黑树。注册是 O(log n),等待是 O(1) + 就绪事件数量。

3.4 LT vs ET 的本质区别(内核视角)

为什么 ET 几乎必选

  1. LT 下注册 EPOLLOUT 时,只要写缓冲区有空间就一直通知,导致 busy loop。ET 下只需注册一次
  2. ET 减少 epoll_wait 系统调用次数
  3. ET 下多线程可以更安全地处理同一个 fd(一个事件只被一个线程取走)

ET 的硬约束

3.5 其他平台

平台机制类型
Linux 2.6+epollreadiness
BSD/macOSkqueuereadiness
WindowsIOCPcompletion
Linux 5.1+io_uringcompletion(真正异步)

4. Reactor 模式的本质

Reactor 是 Douglas Schmidt 1995 年在 POSA 书里总结的事件驱动模式。核心角色:

4.1 Reactor vs Proactor

Boost.Asio 的”伪 Proactor”:Asio 对上层暴露 async_read(buf, callback) 的 Proactor 接口,但 Linux 下底层是 epoll + 用户态 read,内部做了一层模拟。真正的 Proactor 要等 io_uring 后端成熟。

4.2 为什么 Reactor 是 C++ 高性能服务器的标配


5. muduo 风格架构:one loop per thread

5.1 多 Reactor 模型

           ┌──────────────┐
           │  Main Reactor │  listen fd + Acceptor
           └───────┬──────┘
                   │ accept
                   │ round-robin 分发
         ┌─────────┼─────────┐
         ▼         ▼         ▼
  ┌──────────┐┌──────────┐┌──────────┐
  │ Sub R 1  ││ Sub R 2  ││ Sub R 3  │
  │ EventLoop││ EventLoop││ EventLoop│
  │ + epoll  ││ + epoll  ││ + epoll  │
  └──────────┘└──────────┘└──────────┘

核心约束:一个连接一旦分配给某个 SubReactor,生命周期内不再迁移。所有 I/O 和回调都在该线程执行,避免锁。

5.2 核心抽象

Channel:fd + 关注事件 + 回调集合。Channel 不拥有 fd,销毁时不 close。一对一绑定到某个 EventLoop。

Poller:epoll 的 OO 封装(muduo 里是抽象基类 Poller,派生出 EPollPollerPollPoller)。维护 fd → Channel* 映射。

EventLoop

class EventLoop {
public:
    void loop() {
        while (!quit_) {
            activeChannels_.clear();
            pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
            for (Channel* ch : activeChannels_) {
                ch->handleEvent(pollReturnTime_);
            }
            doPendingFunctors();  // 执行跨线程投递的任务
        }
    }
    void runInLoop(Functor cb);   // 同线程直接执行
    void queueInLoop(Functor cb); // 投递到 pendingFunctors_,wakeup
    void wakeup();                 // 写 eventfd 唤醒 epoll_wait
private:
    int wakeupFd_;                 // eventfd
    std::unique_ptr<Channel> wakeupChannel_;
    std::mutex mutex_;
    std::vector<Functor> pendingFunctors_;
    const std::thread::id threadId_;
};

关键设计

5.3 跨线程唤醒:eventfd

int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);  // Linux 2.6.22+
uint64_t one = 1;
write(fd, &one, sizeof(one));   // 唤醒
read(fd, &one, sizeof(one));    // 消耗事件

比 pipe 更轻(只需一个 fd 而不是两个),语义更清晰(累加计数器),注册到 epoll 即可。

5.4 定时器:timerfd + 最小堆

int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);  // Linux 2.6.25+
timerfd_settime(tfd, 0, &new_value, nullptr);

TimerQueuestd::set<Entry> 维护按过期时间排序的定时器,每次取最早的过期时间 settime,到期后 epoll_wait 返回 timerfd 可读事件,处理所有过期 timer。定时器、I/O、跨线程唤醒统一到一个 epoll 事件循环,无需额外线程。

5.5 TcpConnection

class TcpConnection : public std::enable_shared_from_this<TcpConnection> {
    EventLoop* loop_;
    std::string name_;
    std::atomic<int> state_;       // kConnecting/kConnected/kDisconnecting/kDisconnected
    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;
    Buffer inputBuffer_;
    Buffer outputBuffer_;
    MessageCallback messageCallback_;
    CloseCallback closeCallback_;
};

TcpServer:组合 Acceptor(封装 listen fd)+ EventLoopThreadPool(工作线程池)+ std::map<name, TcpConnectionPtr>


6. 应用层 Buffer:muduo 的 readv + extrabuf 技巧

为什么需要应用层 Buffer:TCP 是字节流,应用层要等到完整消息才能处理。内核 socket buffer 不够用:你无法把半个消息”退回”内核 buffer 等下次读完。

6.1 muduo Buffer 布局

+-------------------+------------------+------------------+
| prependable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
0      <=      readerIndex   <=   writerIndex    <=    size

6.2 readv + extrabuf 技巧

ssize_t Buffer::readFd(int fd, int* savedErrno) {
    char extrabuf[65536];                         // 栈上 64KB
    struct iovec vec[2];
    const size_t writable = writableBytes();
    vec[0].iov_base = begin() + writerIndex_;
    vec[0].iov_len  = writable;
    vec[1].iov_base = extrabuf;
    vec[1].iov_len  = sizeof(extrabuf);
    const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
    const ssize_t n = ::readv(fd, vec, iovcnt);
    if (n < 0) {
        *savedErrno = errno;
    } else if (static_cast<size_t>(n) <= writable) {
        writerIndex_ += n;
    } else {
        writerIndex_ = buffer_.size();
        append(extrabuf, n - writable);  // 溢出部分追加
    }
    return n;
}

精妙之处

环形 vs 线性:环形 buffer 节省拷贝但无法直接给 read/write 连续内存(需两次系统调用或 iovec),实现复杂。muduo 选线性 std::vector<char> + 偶尔 retrieveAll() 复位索引,以简单换性能


7. 粘包 / 拆包

根源:TCP 是字节流协议,没有消息边界。发送方 send(msg1) + send(msg2),接收方可能一次 recv 把两条拼一起(粘包),或一条消息要几次 recv 才能读完(拆包)。

解决方案

  1. 定长消息:简单但浪费带宽
  2. 分隔符:HTTP 头 \r\n\r\n。不适合二进制数据
  3. 定长头 + 变长体(推荐):4 字节 uint32_t 表示 body 长度 + body。Redis RESP、gRPC、Kafka 都是这种
  4. TLV(Type-Length-Value):协议自描述

定长头 + 变长体的典型 Codec

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp) {
    while (buf->readableBytes() >= kHeaderLen) {
        const int32_t len = buf->peekInt32();   // 不移动 readerIndex
        if (len < 0 || len > kMaxLen) {
            conn->shutdown();                    // 非法长度,踢连接
            break;
        }
        if (buf->readableBytes() < kHeaderLen + len) break;  // 还没收齐
        buf->retrieveInt32();
        std::string message(buf->peek(), len);
        buf->retrieve(len);
        handleMessage(conn, message);
    }
}

典型错误:新手常写 recv 一次当一个完整消息。运气好 QA 测不出来,线上一上高并发就错乱。


8. 生命周期管理:shared_ptr + weak_ptr(muduo 最有技术含量的部分)

8.1 问题场景

class TcpConnection {
    void handleRead() {
        if (messageCallback_) {
            messageCallback_(this, &inputBuffer_);  // 回调里可能注册定时任务
        }
    }
};

// 用户代码
loop->runAfter(5.0, [conn]() {
    conn->send("delayed");  // 5 秒后这个 conn 可能已经析构了!
});

悬挂指针:回调注册时连接还活着,执行时可能已被 TcpServer 析构(客户端关闭)。裸指针 this 一旦 delete 就成了悬挂指针,访问就是 UAF。

8.2 muduo 的解法

  1. TcpConnection 继承 std::enable_shared_from_this<TcpConnection>,由 shared_ptr 管理生命周期
  2. 所有异步回调捕获 shared_ptrweak_ptr,不是裸指针
  3. Channel 弱引用 TcpConnection:Channel 持有 weak_ptr<void> tie_handleEventlock() 提升为 shared_ptr,成功则说明连接还活着
void Channel::tie(const std::shared_ptr<void>& obj) {
    tie_ = obj;
    tied_ = true;
}

void Channel::handleEvent(Timestamp receiveTime) {
    std::shared_ptr<void> guard;
    if (tied_) {
        guard = tie_.lock();
        if (guard) handleEventWithGuard(receiveTime);
        // else 连接已析构,跳过
    } else {
        handleEventWithGuard(receiveTime);
    }
}
  1. TcpConnection::connectEstablished 里 tie
void TcpConnection::connectEstablished() {
    channel_->tie(shared_from_this());
    channel_->enableReading();
    connectionCallback_(shared_from_this());
}
  1. 析构延迟handleEventWithGuard 期间的 guard 延长了 TcpConnection 的生命周期到事件处理结束,避免”处理到一半被析构”

8.3 shared_from_this 的限制

不能在构造函数里调 shared_from_this()enable_shared_from_thisweak_ptr 是在对象被 shared_ptr 首次包装时初始化的。muduo 把 connectEstablished 放在构造之后调用正是为此。

8.4 析构必须在正确的 EventLoop 线程

void TcpServer::removeConnection(const TcpConnectionPtr& conn) {
    loop_->runInLoop([this, conn]() {
        connections_.erase(conn->name());
        EventLoop* ioLoop = conn->getLoop();
        ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
    });
}

为什么是 queueInLoop 而不是 runInLooprunInLoop 若已在该线程会同步调用,TcpConnection 会在 handleEvent 栈帧里被析构,但 Channel 的事件处理还没返回。queueInLoop 保证析构推迟到 handleEvent 完整返回之后。


9. 惊群问题的演进

Linux 2.4 之前:多进程/线程调用 accept 同一 listen fd 时,内核唤醒所有等待者,但只有一个能成功 accept,其他返回 EAGAIN——这就是惊群。Apache prefork 模式因此性能低下。

Linux 2.6WQ_FLAG_EXCLUSIVE 标志解决了 accept 惊群。但 epoll 流行后出现新的惊群——多个进程把 listen fd 注册到各自 epoll,新连接到来时所有进程的 epoll_wait 都被唤醒。

Nginx 的 workaroundaccept_mutex on,用户态文件锁串行化 accept。代价:锁竞争、延迟增加。

Linux 3.9: SO_REUSEPORT

int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));

多个进程/线程各自 socket + bind 同一端口(设置 SO_REUSEPORT),内核为每个监听 socket 维护独立的 accept 队列,基于四元组哈希分发到某一个 socket。每个进程只 epoll_wait 自己的 listen fd,彻底消除惊群,顺带实现内核级负载均衡。

Linux 4.5: EPOLLEXCLUSIVE

ev.events = EPOLLIN | EPOLLEXCLUSIVE;

多个进程共享同一 listen fd,epoll 注册时带 EPOLLEXCLUSIVE,内核只唤醒一个等待者。适合”单进程 listen fd + fork worker”的模型。

2026 年写新服务:直接用 SO_REUSEPORT,别再搞 accept_mutex 这种历史包袱。


10. 性能优化要点

10.1 零拷贝

10.2 TCP 调参

10.3 资源上限

10.4 对象池

TcpConnectionBuffer 用对象池复用,避免高频 new/delete 触发分配器锁竞争。配合 shared_ptr 引用计数规避 ABA 问题。


11. 典型陷阱

陷阱说明
ET 模式忘记循环读到 EAGAIN数据残留在 socket buffer
非阻塞 connect返回 -1 + EINPROGRESS,注册 EPOLLOUT,触发后用 getsockopt(SO_ERROR) 检查实际结果
write 返回值小于请求内核缓冲满,剩余追加到 outputBuffer,注册 EPOLLOUT
recv 返回 0 vs -1 + EAGAIN vs ECONNRESET必须分清:0 是对端关闭,EAGAIN 正常,ECONNRESET 异常
SIGPIPE 杀手属性向已关闭写端写入会 kill 进程,必须 signal(SIGPIPE, SIG_IGN)sendMSG_NOSIGNAL
accept 要循环到 EAGAINET 下 listen fd 就绪时可能有多个连接排队
时间轮 vs 最小堆最小堆(muduo)精度高但插入 O(log n),时间轮(Netty)O(1) 但精度粗
跨线程析构对象通过 queueInLoop 调度到正确线程
epoll_wait 返回 fd 已被 closedata.ptr 指向 Channel 而非 fd 号码,配合 tie_ 机制
EMFILE 时 LT 模式 busy loopmuduo 预留 idle fd 技巧

SIGPIPE 是最容易踩的:向已关闭写端 socket 写入时内核发 SIGPIPE默认 kill 进程。生产代码第一件事:

signal(SIGPIPE, SIG_IGN);
// 或每次 send 带 MSG_NOSIGNAL

这是”项目跑着跑着突然死了找不到原因”的经典坑。


12. Minimal Reactor Echo Server(C++17)

// g++ -std=c++17 -O2 -pthread reactor.cc -o reactor
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <cstring>
#include <cerrno>
#include <cstdio>
#include <functional>
#include <memory>
#include <unordered_map>

constexpr int kMaxEvents = 64;

struct Channel {
    int fd;
    uint32_t events = 0;
    std::function<void()> readCb;
};

class EventLoop {
public:
    EventLoop() { epfd_ = epoll_create1(EPOLL_CLOEXEC); }
    ~EventLoop() { close(epfd_); }

    void add(std::shared_ptr<Channel> ch, uint32_t events) {
        ch->events = events;
        epoll_event ev{};
        ev.events = events | EPOLLET;
        ev.data.ptr = ch.get();
        epoll_ctl(epfd_, EPOLL_CTL_ADD, ch->fd, &ev);
        channels_[ch->fd] = ch;
    }
    void remove(int fd) {
        epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
        channels_.erase(fd);
    }
    void loop() {
        epoll_event events[kMaxEvents];
        while (!quit_) {
            int n = epoll_wait(epfd_, events, kMaxEvents, -1);
            if (n < 0) {
                if (errno == EINTR) continue;
                perror("epoll_wait"); break;
            }
            for (int i = 0; i < n; ++i) {
                auto* ch = static_cast<Channel*>(events[i].data.ptr);
                if (events[i].events & EPOLLIN && ch->readCb) ch->readCb();
            }
        }
    }
private:
    int epfd_;
    bool quit_ = false;
    std::unordered_map<int, std::shared_ptr<Channel>> channels_;
};

int main() {
    signal(SIGPIPE, SIG_IGN);   // 关键:避免向半关闭连接写入被 kill

    int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
    int opt = 1;
    setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9000);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(listenFd, (sockaddr*)&addr, sizeof(addr));
    listen(listenFd, 4096);

    EventLoop loop;
    auto listenCh = std::make_shared<Channel>();
    listenCh->fd = listenFd;
    listenCh->readCb = [&]() {
        // ET: 必须循环 accept 到 EAGAIN
        while (true) {
            sockaddr_in peer{};
            socklen_t len = sizeof(peer);
            int conn = accept4(listenFd, (sockaddr*)&peer, &len,
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
            if (conn < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                if (errno == EINTR) continue;
                perror("accept4"); break;
            }
            auto connCh = std::make_shared<Channel>();
            connCh->fd = conn;
            connCh->readCb = [&loop, connCh]() {
                char buf[65536];
                while (true) {
                    ssize_t n = ::read(connCh->fd, buf, sizeof(buf));
                    if (n > 0) {
                        ssize_t w = 0;
                        while (w < n) {
                            ssize_t m = ::write(connCh->fd, buf + w, n - w);
                            if (m < 0) {
                                if (errno == EAGAIN) break;
                                if (errno == EINTR) continue;
                                return;
                            }
                            w += m;
                        }
                    } else if (n == 0) {
                        loop.remove(connCh->fd);
                        close(connCh->fd);
                        return;
                    } else {
                        if (errno == EAGAIN) break;
                        if (errno == EINTR) continue;
                        loop.remove(connCh->fd);
                        close(connCh->fd);
                        return;
                    }
                }
            };
            loop.add(connCh, EPOLLIN);
        }
    };
    loop.add(listenCh, EPOLLIN);
    printf("Reactor echo server listening on :9000\n");
    loop.loop();
    return 0;
}

测试nc 127.0.0.1 9000 输入任意内容看到回显。

简化之处(生产需补):没有写缓冲区(write 返回小于请求时丢数据)、没有生命周期管理、没有定时器/跨线程唤醒/工作线程池、没有粘包处理。


13. 生产 Checklist

启动时必做:

运行时监控:


14. 深入话题

14.1 为什么 ET + 非阻塞是”几乎必选”组合

ET 只在状态变化时通知一次。fd 如果是阻塞的,读到最后 read 会挂住等新数据——但”等新数据”该靠 epoll_wait,不能在 read 里等。阻塞 + ET 是自相矛盾

反过来,非阻塞 + LT 也能工作,但有两个劣势:EPOLLOUT 反复通知导致 busy loop(必须”有数据写才注册,写完立即注销”);多线程处理同一 fd 必须加锁。ET + 非阻塞是最干净的组合。

14.2 muduo 生命周期管理的严密性

shared_ptr + weak_ptr 的组合不是随便用的。关键不变式:

  1. TcpConnection 只通过 shared_ptr 访问——从源头消除悬挂指针
  2. 回调闭包捕获 shared_ptr——延长生命周期到回调执行完
  3. Channel 的 tie_ 机制——handleEvent 期间用 lock() 持有 shared_ptr
  4. 析构在正确 EventLoop 线程——queueInLoop 调度保证无锁访问
  5. shared_from_this 只在”已被 shared_ptr 包装之后”调用

每一条规则背后都对应一种 UAF 场景。这就是为什么陈硕反复强调”多线程 C++ 对象生命周期管理是最难的”。

14.3 io_uring 相对 epoll 的革命

epoll 的本质限制:仍然是”轮询就绪,用户态 read”的两阶段模型。每个 I/O 仍是”epoll_wait + read”两次系统调用。Meltdown/Spectre 补丁后 syscall 开销增加 30%+。

io_uring 设计(Jens Axboe, Linux 5.1, 2019):

生产采用:Ceph、RocksDB、ScyllaDB 已大量使用;Nginx 1.25.0+ 实验性支持;Linux 内核 5.19+ 默认启用。

痛点:编程复杂度高(buffer 生命周期要等到 CQ 完成才能释放)、早期版本漏洞多(Google 一度禁用)、Linux 专属、现有库迁移成本高。

应届生视角:知道 io*uring 存在、能讲清楚”为什么 io_uring 是真正的 Proactor 而 epoll 不是”,面试加分。但别装作用过,面试官一追问 IORING_OP*\* 就露馅。


15. 关键信息来源

置信度说明:


分享这篇文章:

上一篇
Linux I/O(二):io_uring 的双环模型与工程边界
下一篇
Linux I/O(三):BSD socket 编程手册