Table of contents
Open Table of contents
TL;DR
Layer trait 定义了”如何把一个 Service 包装成另一个 Service”, ServiceBuilder 提供声明式 API 将多个 Layer 按顺序堆叠. 中间件组合在编译期完成类型嵌套 (如 Timeout<RateLimit<ConcurrencyLimit<S>>>), 零运行时开销, 这是 tower 与 Express/Koa 等运行时中间件链的本质区别.
1. Layer trait: 中间件的工厂
1.1 完整定义
// crate: tower-layer
// 模块: tower_layer
pub trait Layer<S> {
/// 包装后产生的 Service 类型
type Service;
/// 将 inner service 包装成新的 service
fn layer(&self, inner: S) -> Self::Service;
}
Layer 不是 Service. Layer 是 Service 的工厂 — 它接收一个 Service, 返回一个新的 (包装过的) Service.
1.2 Layer 与 Service 的关系
Layer::layer(inner_service) → WrappedService
|
↓
impl Service<Request>
一个具体的例子 — 日志中间件:
use tower::{Layer, Service};
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
// Step 1: 定义 Layer (工厂)
pub struct LogLayer {
target: &'static str,
}
impl<S> Layer<S> for LogLayer {
type Service = LogService<S>;
fn layer(&self, inner: S) -> Self::Service {
LogService {
target: self.target,
inner,
}
}
}
// Step 2: 定义包装后的 Service
pub struct LogService<S> {
target: &'static str,
inner: S,
}
impl<S, Request> Service<Request> for LogService<S>
where
S: Service<Request>,
Request: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx) // 透传背压信号
}
fn call(&mut self, req: Request) -> Self::Future {
tracing::debug!(target: self.target, ?req, "processing request");
self.inner.call(req) // 委托给内部 service
}
}
关键模式: Layer 持有配置 (如 target), Service 持有内部 Service 引用 + 配置. 这种分离允许同一个 Layer 实例应用到不同的 Service 上.
2. 洋葱模型
2.1 中间件堆叠的类型结构
当多个 Layer 依次应用时, 产生嵌套类型:
use tower::timeout::TimeoutLayer;
use tower::limit::{RateLimitLayer, ConcurrencyLimitLayer};
use std::time::Duration;
// 手动应用 Layer
let service = my_service;
let service = ConcurrencyLimitLayer::new(10).layer(service);
let service = RateLimitLayer::new(100, Duration::from_secs(1)).layer(service);
let service = TimeoutLayer::new(Duration::from_secs(30)).layer(service);
// 最终类型: Timeout<RateLimit<ConcurrencyLimit<MyService>>>
请求的流经路径 (洋葱模型):
Request → Timeout → RateLimit → ConcurrencyLimit → MyService
|
Response ← Timeout ← RateLimit ← ConcurrencyLimit ←────┘
外层中间件先看到请求, 最后看到响应. 这与 Express.js 的 next() 模型语义相同, 但实现方式完全不同:
| 特性 | tower (Rust) | Express.js |
|---|---|---|
| 中间件链表示 | 编译期类型嵌套 | 运行时数组 |
| 调度开销 | 零 (单态化内联) | 每层一次虚函数调用 |
| 类型安全 | 编译期检查 Request/Response 类型 | 无 (any) |
| 添加/移除中间件 | 需要重新编译 | 运行时动态 |
| 错误类型 | 每层可以有不同的错误类型, 编译期组合 | 统一 Error |
2.2 洋葱的方向: 最先添加的 Layer 最靠外
这是一个关键的认知点. 用 ServiceBuilder 时:
ServiceBuilder::new()
.layer(A) // 最外层, 最先处理请求
.layer(B) // 中间层
.layer(C) // 最内层, 最接近实际 service
.service(my_service)
等价于:
A.layer(B.layer(C.layer(my_service)))
类型是 A::Service<B::Service<C::Service<MyService>>>. 请求先经过 A, 再经过 B, 最后经过 C, 到达 MyService.
3. ServiceBuilder: 声明式中间件组合
3.1 核心 API
use tower::ServiceBuilder;
use std::time::Duration;
let service = ServiceBuilder::new()
// 请求入队缓冲 (允许 Service 不实现 Clone)
.buffer(100)
// 并发限制
.concurrency_limit(10)
// 限速: 每秒 100 个请求
.rate_limit(100, Duration::from_secs(1))
// 超时
.timeout(Duration::from_secs(30))
// 自定义 Layer
.layer(MyCustomLayer::new())
// 应用到具体 service
.service(my_service);
3.2 完整方法列表
内置中间件快捷方法 (需要对应 feature flag):
| 方法 | Feature | 功能 |
|---|---|---|
.buffer(bound) | buffer | 在 service 前加 mpsc 缓冲通道 |
.concurrency_limit(max) | limit | 限制最大并发请求数 |
.rate_limit(num, per) | limit | 限制请求速率 |
.timeout(duration) | timeout | 请求超时 |
.retry(policy) | retry | 按策略重试失败请求 |
.load_shed() | load-shed | 服务不就绪时直接拒绝请求 |
.filter(predicate) | filter | 按条件过滤请求 |
变换方法:
| 方法 | 功能 |
|---|---|
.map_request(f) | 变换请求类型 |
.map_response(f) | 变换响应类型 |
.map_err(f) | 变换错误类型 |
.map_future(f) | 变换返回的 Future |
.then(f) | 在响应后应用异步函数 |
.and_then(f) | 在响应后链式异步操作 |
核心方法:
| 方法 | 功能 |
|---|---|
.layer(l) | 添加自定义 Layer |
.service(s) | 用累积的 Layer 包装 Service |
.service_fn(f) | 用累积的 Layer 包装 async fn |
.into_inner() | 提取底层 Layer |
.boxed() | 类型擦除 (返回 BoxService) |
.boxed_clone() | 类型擦除 + Clone (返回 BoxCloneService) |
3.3 Layer 应用顺序的实际影响
顺序不同, 行为不同:
// 方案 A: buffer 在 concurrency_limit 外面
ServiceBuilder::new()
.buffer(100) // 允许 100 个请求排队
.concurrency_limit(10) // 最多同时处理 10 个
.service(svc)
// → 最多 110 个请求在系统中 (100 排队 + 10 处理中)
// 方案 B: buffer 在 concurrency_limit 里面
ServiceBuilder::new()
.concurrency_limit(10) // 最多 10 个请求进入
.buffer(100) // 100 的缓冲在限制之内
.service(svc)
// → 最多 10 个请求在系统中 (缓冲区被限制保护)
这不是 bug, 是 feature. 开发者对中间件的位置有完全控制权.
4. 高级用法
4.1 Layer 组合 (元组)
多个 Layer 可以组合成一个元组 Layer:
use tower::layer::util::Stack;
// ServiceBuilder 内部使用 Stack 类型嵌套 Layer
// Stack<TimeoutLayer, Stack<RateLimitLayer, Stack<ConcurrencyLimitLayer, Identity>>>
tower 为最多 16 个元素的元组实现了 Layer, 超过 16 层需要手动分组或使用 ServiceBuilder.
4.2 类型擦除: BoxService 和 BoxCloneService
当中间件堆叠后的类型过于复杂, 可以用类型擦除:
use tower::BoxService;
// 不擦除: Timeout<RateLimit<ConcurrencyLimit<MyService>>>
// 擦除后: BoxService<Request, Response, Error>
let service: BoxService<Request, Response, Box<dyn std::error::Error + Send + Sync>> =
ServiceBuilder::new()
.timeout(Duration::from_secs(30))
.rate_limit(100, Duration::from_secs(1))
.concurrency_limit(10)
.boxed()
.service(my_service);
类型擦除的代价: 每次 call 变成虚函数调用 (vtable), 失去内联优化. 在大多数场景下这个开销可忽略, 但在极高性能路径上需要权衡.
4.3 ServiceBuilder 本身实现 Layer
ServiceBuilder 实现了 Layer<S> trait, 这意味着它可以嵌套在其他 ServiceBuilder 中:
let inner_stack = ServiceBuilder::new()
.concurrency_limit(10)
.rate_limit(100, Duration::from_secs(1));
let full_stack = ServiceBuilder::new()
.timeout(Duration::from_secs(30))
.layer(inner_stack) // ServiceBuilder 作为 Layer 使用
.service(my_service);
5. Pitfalls
5.1 Layer 顺序与直觉相反
.layer(A).layer(B).service(svc) 的请求流是 A → B → svc, 但代码阅读顺序是从上到下. 初学者常以为 B 在 A 前面. 理解方式: 先写的先执行 (与 Express app.use() 一致).
5.2 泛型爆炸
三四层中间件后, 完整类型可能长达数百字符. 编译错误信息极难阅读. 应对:
- 用
type alias命名中间层 - 在确定不需要极致性能时, 用
.boxed()擦除类型 - 用
ServiceBuilder而非手动嵌套
5.3 Layer 的 Clone 语义
Layer 通常需要 Clone, 因为它可能被多次使用 (比如用同一个 Layer 包装多个不同的 Service). 如果你的自定义 Layer 持有不可 Clone 的状态, 要用 Arc 包装.
5.4 poll_ready 的传播义务
实现包装 Service 时, 必须在 poll_ready 中调用内部 service 的 poll_ready. 忘记传播背压信号是常见 bug:
// 错误: 丢失了背压传播
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) // 总是就绪? 那内部 service 的背压去哪了?
}
// 正确: 传播内部 service 的就绪状态
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
5.5 混淆 Layer 和 Service
Layer 不处理请求, Service 才处理. Layer 是配置时概念 (应用启动时组装), Service 是运行时概念 (每个请求经过). 不要在 Layer 中放运行时逻辑.