Table of contents
Open Table of contents
TL;DR
tower 提供协议无关的通用中间件, 每个都通过 feature flag 独立启用. 核心六个: ConcurrencyLimit (并发数上限), RateLimit (速率限制), Timeout (超时), Retry (重试 + Policy trait), Buffer (mpsc 通道使 Service 可 Clone), LoadShed (过载丢弃). 它们与 tower-http 的 HTTP 专用中间件互补而非重复.
1. 中间件全景
tower crate 的模块结构及功能:
| 模块 | Feature Flag | 功能 | 协议依赖 |
|---|---|---|---|
limit::ConcurrencyLimit | limit | 限制同时处理的最大请求数 | 无 |
limit::RateLimit | limit | 限制单位时间内的请求数 | 无 |
timeout::Timeout | timeout | 请求超时自动失败 | 无 |
retry::Retry | retry | 按 Policy 重试失败请求 | 无 |
buffer::Buffer | buffer | 在 Service 前加 mpsc 缓冲通道 | 无 |
load_shed::LoadShed | load-shed | 服务不就绪时直接拒绝 | 无 |
filter::Filter | filter | 按谓词过滤/拒绝请求 | 无 |
hedge::Hedge | hedge | 预测性重试 (hedged request) | 无 |
balance | balance | 跨多个 Service 负载均衡 | 无 |
load | load | 测量 Service 负载指标 | 无 |
discover | discover | 服务发现 | 无 |
reconnect | reconnect | 连接断开后自动重连 | 无 |
steer | steer | 请求路由到不同 Service | 无 |
spawn_ready | spawn_ready | 在后台 task 驱动 poll_ready | 无 |
ready_cache | ready-cache | 缓存 Service 的就绪状态 | 无 |
所有中间件都是协议无关的 — 它们不知道也不关心请求是 HTTP、gRPC 还是自定义协议.
2. ConcurrencyLimit: 并发数控制
2.1 工作原理
用 Semaphore 控制同时处理的请求数. 当并发数达到上限时, poll_ready 返回 Pending, 新请求等待直到有请求完成.
use tower::ServiceBuilder;
let service = ServiceBuilder::new()
.concurrency_limit(10) // 最多同时处理 10 个请求
.service(my_service);
2.2 核心类型
| 类型 | 说明 |
|---|---|
ConcurrencyLimit<S> | 包装后的 Service, 持有 semaphore |
ConcurrencyLimitLayer | 对应的 Layer, 构造时指定 max |
GlobalConcurrencyLimitLayer | 多个 Service 实例共享同一个 semaphore |
2.3 GlobalConcurrencyLimitLayer 的使用场景
当一个 Service 被 clone 成多份 (比如每个连接一份) 时, ConcurrencyLimitLayer 给每个 clone 一个独立的 semaphore — 10 个 clone 各限制 10 = 实际并发 100. GlobalConcurrencyLimitLayer 让所有 clone 共享一个 semaphore.
use tower::limit::GlobalConcurrencyLimitLayer;
let layer = GlobalConcurrencyLimitLayer::new(10);
// 所有通过此 layer 包装的 service (包括它们的 clone) 共享同一个 10 并发上限
2.4 背压机制
ConcurrencyLimit 的 poll_ready 是 tower 背压系统的核心消费者:
- 内部使用
tokio::sync::Semaphore poll_ready尝试 acquire permit, 拿到则 Ready, 拿不到则 Pendingcall返回的 Future 完成时自动归还 permit
3. RateLimit: 速率限制
3.1 工作原理
滑动窗口限速: 在给定时间窗口内最多允许 N 个请求.
use tower::ServiceBuilder;
use std::time::Duration;
let service = ServiceBuilder::new()
.rate_limit(100, Duration::from_secs(1)) // 每秒最多 100 个请求
.service(my_service);
3.2 核心类型
| 类型 | 说明 |
|---|---|
RateLimit<S> | 限速后的 Service |
RateLimitLayer | 对应的 Layer, 构造参数 (num, per) |
3.3 与 ConcurrencyLimit 的区别
| 特性 | ConcurrencyLimit | RateLimit |
|---|---|---|
| 控制维度 | 同时进行的请求数 | 单位时间内的请求数 |
| 快请求场景 | 允许高吞吐 (请求快速完成就释放) | 无论快慢都限速 |
| 慢请求场景 | 可能全部卡住 | 仍然按速率放行 |
| 典型场景 | 数据库连接池、线程池 | API 限速、防暴力破解 |
生产环境通常两者配合使用:
ServiceBuilder::new()
.rate_limit(1000, Duration::from_secs(1)) // 限速
.concurrency_limit(50) // 限并发
.service(my_service);
4. Timeout: 超时控制
4.1 工作原理
为每个请求设置最大处理时间. 如果内部 Service 在指定时间内没有返回响应, 直接返回错误.
use tower::ServiceBuilder;
use std::time::Duration;
let service = ServiceBuilder::new()
.timeout(Duration::from_secs(30))
.service(my_service);
4.2 核心类型
| 类型 | 说明 |
|---|---|
Timeout<S> | 包装后的 Service |
TimeoutLayer | 对应的 Layer |
timeout::error::Elapsed | 超时时返回的错误类型 |
4.3 超时与 poll_ready
Timeout 的 poll_ready 不计入超时时间. 超时只从 call 被调用开始计算. 这意味着如果请求在 poll_ready 阶段等待了很久 (因为背压), 这段时间不算超时.
这是设计意图: poll_ready 的等待是背压系统的正常行为, 不应该被超时中断.
5. Retry: 重试机制
5.1 Policy trait
Retry 的核心是 Policy trait, 它决定”这次失败要不要重试”:
// crate: tower
// 模块: tower::retry::Policy
pub trait Policy<Req, Res, E> {
/// 重试延迟的 Future 类型
type Future: Future<Output = ()>;
/// 根据响应/错误决定是否重试
///
/// 返回 Some(future) → 等 future 完成后重试
/// 返回 None → 不重试, 返回当前结果
fn retry(
&mut self,
req: &mut Req,
result: &mut Result<Res, E>,
) -> Option<Self::Future>;
/// 克隆请求以供重试使用
///
/// 返回 None → 无法克隆, 不进行重试
fn clone_request(&mut self, req: &Req) -> Option<Req>;
}
5.2 Policy 的设计要点
retry 方法接收 &mut Req 和 &mut Result<Res, E>:
- 可以修改请求 (比如添加重试次数 header)
- 可以修改结果 (比如把某些错误转换为成功)
- 返回的 Future 用于实现延迟重试 (backoff)
clone_request 必须显式克隆请求:
- 不是所有请求都能克隆 (比如带 Body 的 HTTP POST)
- 返回
None意味着放弃重试
5.3 实现示例
use tower::retry::Policy;
use std::future;
#[derive(Clone)]
struct RetryOnServerError {
remaining: usize,
}
impl<Req, Res, E> Policy<Req, Res, E> for RetryOnServerError
where
Req: Clone,
E: std::fmt::Debug,
{
type Future = future::Ready<()>;
fn retry(
&mut self,
_req: &mut Req,
result: &mut Result<Res, E>,
) -> Option<Self::Future> {
if self.remaining == 0 {
return None; // 重试次数耗尽
}
match result {
Ok(_) => None, // 成功, 不重试
Err(e) => {
tracing::warn!(?e, remaining = self.remaining, "retrying request");
self.remaining -= 1;
Some(future::ready(())) // 立即重试 (无 backoff)
}
}
}
fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
// 使用
let service = ServiceBuilder::new()
.retry(RetryOnServerError { remaining: 3 })
.service(my_service);
5.4 Backoff 与 Budget
tower 提供配套模块:
tower::retry::backoff— 指数退避等延迟策略tower::retry::budget— 重试预算, 限制重试占总请求的比例, 防止重试风暴
6. Buffer: 使 Service 可 Clone
6.1 解决的核心问题
Service trait 的 call(&mut self, ...) 需要 &mut self, 这意味着一个 Service 实例不能被多个 task 同时使用. Buffer 通过在前面放一个 mpsc channel 解决这个问题:
Task 1 ──┐ ┌─── Service
Task 2 ──┤── mpsc channel ────┤ (单一消费者)
Task 3 ──┘ (Buffer) └───
6.2 使用方式
let service = ServiceBuilder::new()
.buffer(100) // 通道容量 100
.service(my_service);
// 现在 service 可以 clone
let svc1 = service.clone();
let svc2 = service.clone();
// svc1 和 svc2 共享同一个底层 service
6.3 核心类型
| 类型 | 说明 |
|---|---|
Buffer<S, Request> | 包装后的 Service, 实现 Clone |
BufferLayer | 对应的 Layer |
| Worker (内部) | 后台 task, 从 channel 取请求喂给 Service |
6.4 Buffer 的权衡
| 优势 | 代价 |
|---|---|
| Service 变成 Clone | 每次请求多一次 channel send/recv |
| 天然限流 (channel 满则背压) | 需要 spawn 后台 Worker task |
| 解耦请求提交与处理 | 如果 Worker panic, 所有 Buffer clone 失效 |
7. 其他值得了解的中间件
7.1 LoadShed
服务不就绪时直接拒绝请求, 而不是排队等待:
let service = ServiceBuilder::new()
.load_shed() // poll_ready 返回 Pending 时直接返回错误
.service(my_service);
与 ConcurrencyLimit 的区别: ConcurrencyLimit 让请求等待, LoadShed 让请求立即失败. 适合”宁可快速失败也不要长时间排队”的场景.
7.2 Hedge (预测性重试)
当请求耗时超过 P50/P90 延迟时, 自动发送第二个相同请求 (hedged request), 取先返回的结果:
// hedge 会在请求可能较慢时并发发送第二个请求
// 适合幂等请求, 可以显著降低 tail latency
来源于 Google 的”The Tail at Scale”论文. 代价是增加了总请求量.
7.3 Balance (负载均衡)
跨多个 Service 实例分发请求:
// tower::balance 模块提供:
// - p2c::Balance: Power of Two Choices 负载均衡 (O(1) 选择, 接近最优)
// - pool::Pool: 连接池式负载均衡
Balance 是 poll_ready 设计的核心受益者 — 它通过探测各后端的 poll_ready 状态来评估负载.
7.4 Discover (服务发现)
与 Balance 配合, 动态发现可用的后端服务:
// tower::discover::Change 枚举
pub enum Change<K, V> {
Insert(K, V), // 新增后端
Remove(K), // 移除后端
}
8. 中间件组合最佳实践
8.1 推荐的中间件顺序 (从外到内)
ServiceBuilder::new()
// 1. 可观测性 (最外层, 看到所有请求)
.layer(TraceLayer::new_for_http())
// 2. 超时 (限制总处理时间)
.timeout(Duration::from_secs(30))
// 3. 速率限制 (防止过载)
.rate_limit(1000, Duration::from_secs(1))
// 4. 并发限制 (控制资源使用)
.concurrency_limit(50)
// 5. 重试 (只重试到达内部 service 的请求)
.retry(my_policy)
// 6. 实际 service
.service(my_service)
原则:
- Trace 在最外层, 记录所有请求 (包括被限流/超时的)
- Timeout 在限流之外, 确保排队时间也受控
- Retry 在限流之内, 重试的请求也受限流保护, 不会导致重试风暴
8.2 Feature Flag 配置
# Cargo.toml
[dependencies]
tower = { version = "0.5", features = [
"timeout",
"limit", # 包含 ConcurrencyLimit 和 RateLimit
"retry",
"buffer",
"load-shed",
"util", # ServiceExt, ready(), oneshot() 等
] }
9. Pitfalls
9.1 Buffer 的 Worker panic 传播
Buffer 的后台 Worker 如果 panic, 所有 clone 的 Buffer 实例在下次 poll_ready 时会收到错误. 但这个错误不包含 panic 信息, 只是一个”service closed”错误. 调试时容易困惑.
9.2 Retry + 非幂等请求
Retry 中间件对请求调用 clone_request. 如果请求不是幂等的 (如 POST 创建订单), 重试会导致重复操作. Policy 的 clone_request 返回 None 可以阻止这种情况, 但这也意味着完全不重试.
9.3 RateLimit 的窗口对齐
RateLimit 使用固定窗口, 不是滑动窗口. 在窗口边界处, 可能在极短时间内放行双倍请求 (上一个窗口末尾 + 下一个窗口开头). 如果需要更精确的限速, 考虑使用 tower-governor.
9.4 ConcurrencyLimit + Buffer 的交互
如果 Buffer 在 ConcurrencyLimit 外面, Buffer 的容量会叠加到实际并发数上:
// 实际最大并发 = buffer_size + concurrency_limit = 110
ServiceBuilder::new()
.buffer(100)
.concurrency_limit(10)
.service(svc)
如果意图是严格限制并发为 10, ConcurrencyLimit 应该在 Buffer 外面.
9.5 Timeout 不覆盖 poll_ready 等待时间
如前所述, Timeout 只计算 call 之后的时间. 如果请求在 poll_ready (背压) 阶段等了 5 分钟, Timeout 不会触发. 需要端到端超时的话, 要在应用层用 tokio::time::timeout 包裹整个 ready().await + call() 流程.