跳转到正文
zeno's blog
返回

tower(一):Service trait-异步函数抽象与背压信号

专题: tower

Table of contents

Open Table of contents

TL;DR

tower::Service<Request> 把任何异步请求-响应操作抽象为一个 trait, 核心设计是 poll_ready + call 两步协议 — 先问”你准备好了吗”, 再发请求. poll_ready 的存在使得负载均衡器能探测下游容量, 实现真正的背压传播, 这是 tower 区别于其他中间件框架的根本设计决策.


1. 为什么需要 Service trait

不用 Service trait 时, 异步服务的典型写法是一个 async 函数:

async fn handle(req: HttpRequest) -> Result<HttpResponse, Error> {
    // ...
}

这能工作, 但有三个问题:

问题说明
无法表达背压调用者不知道下游是否过载, 只能无脑发请求
无法组合中间件函数签名不统一, 每个中间件都要手写包装逻辑
无法泛化协议HTTP handler 和 gRPC handler 没有共同接口

Service trait 解决了全部三个问题: 它是一个协议无关的、带背压信号的、可组合的异步函数抽象.

2. 完整 trait 定义

Service trait 定义在独立的 tower-service crate 中 (tower 重新导出它):

// crate: tower-service
// 模块: tower_service

pub trait Service<Request> {
    /// 服务返回的响应类型
    type Response;

    /// 服务可能产生的错误类型
    type Error;

    /// call() 返回的 Future 类型, 解析为 Result<Response, Error>
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    /// 查询服务是否准备好处理请求 (背压信号)
    ///
    /// - Poll::Ready(Ok(())) → 可以调用 call()
    /// - Poll::Pending → 服务过载, 等待通知
    /// - Poll::Ready(Err(_)) → 服务永久失败, 应丢弃此实例
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    /// 处理请求, 返回响应的 Future
    ///
    /// 调用前 **必须** 先通过 poll_ready 确认就绪,
    /// 否则实现可以 panic.
    fn call(&mut self, req: Request) -> Self::Future;
}

三个关联类型 + 两个方法, 没有默认实现, 每一个都是必须的.

3. poll_ready: 背压的核心机制

3.1 为什么不是直接 async fn call()

如果 Service 只有一个 call 方法, 调用者无法在发送请求之前知道服务是否有能力处理. 请求已经发出去了, 即使服务过载也只能排队或丢弃 — 这不是真正的背压, 这是缓冲区溢出.

poll_ready 的设计意图是: 在请求被构造和发送之前, 让调用者有机会感知下游的容量状态.

3.2 负载均衡的关键用途

这是 poll_ready 最重要的使用场景. 一个负载均衡器面对多个后端服务:

// 伪代码: 负载均衡器选择后端
fn select_backend(&mut self, cx: &mut Context<'_>) -> &mut Backend {
    // 遍历所有后端, 选择 poll_ready 返回 Ready 的那个
    // 如果某个后端频繁返回 Pending, 说明它过载, 减少分配给它的请求
    for backend in &mut self.backends {
        if backend.poll_ready(cx).is_ready() {
            return backend;
        }
    }
    // 所有后端都忙, 传播 Pending 给上游
    Poll::Pending
}

没有 poll_ready, 负载均衡器只能用轮询或随机, 无法感知后端实际负载.

3.3 资源预留语义

poll_ready 文档中有一个关键细节:

poll_ready may reserve shared resources that are consumed in a subsequent invocation of call. Thus, it is critical for implementations to not assume that call will always be invoked and to ensure that such resources are released if the service is dropped before call is invoked.

这意味着 poll_ready 返回 Ready 时, 实现可能已经为这次请求预留了资源 (比如连接池中的一个连接, 信号量中的一个 permit). 如果 poll_ready 之后没有调用 call, 这些资源必须在 drop 时释放.

3.4 两步协议的契约

poll_ready() → Ready(Ok(()))  →  call(req)  →  Future<Result<Res, Err>>

  Pending  →  等待, 重新 poll

  Ready(Err(_))  →  服务已死, 丢弃实例

违反此契约 (不调用 poll_ready 就直接 call) 的后果是实现有权 panic. 这不是建议, 是 trait 文档中的明确许可.

4. call: 异步处理请求

call(&mut self, req: Request) -> Self::Future 的设计要点:

设计决策原因
&mut self 而非 &self允许服务维护内部可变状态 (计数器、连接池句柄等)
返回关联类型 Future 而非 impl Future允许调用者具体知道 Future 类型, 支持零开销抽象
Future 必须是 'static返回的 Future 可能被 move 到其他线程/task, 不能借用 &self

&mut self + 'static Future 的组合意味着: 如果 Service 需要在 Future 中访问自己的状态, 必须 clone 共享状态:

struct MyService {
    shared_state: Arc<SharedState>,
}

impl Service<Request> for MyService {
    type Response = Response;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Response, Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(())) // 始终就绪
    }

    fn call(&mut self, req: Request) -> Self::Future {
        // clone Arc, 不借用 &self
        let state = self.shared_state.clone();
        Box::pin(async move {
            // 在 Future 内使用 state
            let result = state.process(req).await;
            Ok(result)
        })
    }
}

4.1 hyper 1.0 的不同选择: &self

hyper 1.0 定义了自己的 Service trait, 使用 &self 而非 &mut self:

// hyper::service::Service (非 tower)
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn call(&self, req: Request) -> Self::Future;
}

hyper 的理由: &self 更符合实际使用模式 — 大多数服务的共享状态已经在 Arc<Mutex<_>> 中, &mut self 只是增加了 API 复杂性而没有实际收益. hyper 也去掉了 poll_ready, 因为它认为在 HTTP 服务器的场景下, 连接级别的背压由 TCP 本身处理.

hyper-util 提供了 TowerToHyperService 适配器, 桥接两种 Service trait.

5. Service 的心智模型

最准确的心智模型:

Service = 一个异步函数, 但带有背压信号和类型级别的中间件组合能力

与其他生态系统的对比:

概念tower::ServiceExpress.js 中间件Java Servlet Filter
类型安全编译期, 泛型运行时运行时
背压poll_ready 原生支持
协议依赖无 (泛型 Request)HTTP onlyHTTP only
组合方式类型嵌套 (Timeout<RateLimit<S>>)函数链链表
零开销是 (单态化)

6. 与 Finagle 的关系

tower 的设计直接受到 Twitter 的 Scala 框架 Finagle 启发. Finagle 在 2011 年提出了 Service[Req, Rep] 抽象:

// Finagle (Scala)
trait Service[Req, Rep] extends (Req => Future[Rep])

tower 继承了 Finagle 的核心理念:

但 tower 在 Rust 的类型系统中做了适配:

7. 为闭包和函数实现 Service

tower 为 async 函数提供了便捷的 service_fn 适配器:

use tower::service_fn;

let service = service_fn(|req: Request| async move {
    Ok::<_, std::convert::Infallible>(Response::new("hello"))
});

service_fn 返回的类型实现了 Service, 其 poll_ready 始终返回 Ready(Ok(())) (即”永远就绪, 无背压”). 这适用于不需要背压控制的简单场景.

8. Pitfalls

8.1 忘记调用 poll_ready

直接调用 call() 而不先检查 poll_ready() 是未定义行为级的错误. 实现有权 panic. 正确用法:

use tower::ServiceExt; // 提供 ready() 和 oneshot() 方法

// 方式 1: 使用 ServiceExt::ready()
let response = svc.ready().await?.call(request).await?;

// 方式 2: 使用 ServiceExt::oneshot() (一次性请求)
let response = svc.oneshot(request).await?;

8.2 poll_ready 与 call 之间插入其他请求

如果多个任务共享一个 &mut Service, 在一个任务的 poll_readycall 之间, 另一个任务不能插入自己的 call. poll_ready 返回的就绪状态是为下一次 call 预留的.

8.3 Future 捕获 &self

call 返回的 Future 必须是 'static, 不能借用 &self. 新手常犯:

// 错误: Future 借用了 &self
fn call(&mut self, req: Request) -> Self::Future {
    Box::pin(async {
        self.inner.do_something().await // 编译错误: self 被借用
    })
}

8.4 忽略 poll_ready 的 Err

poll_ready 返回 Err 意味着服务永久不可用, 不是暂时忙碌. 正确处理是丢弃该服务实例并创建新的.

8.5 在 poll_ready 中做重量级操作

poll_ready 应该是轻量的 (检查信号量、计数器), 不应该做 I/O 或阻塞操作. 它会在每次请求前被调用, 重量级操作会成为性能瓶颈.


分享这篇文章:

上一篇
tower(二):Layer 与 ServiceBuilder-中间件组合机制
下一篇
hyper(三):与 axum、tower 的架构关系