跳转到正文
zeno's blog
返回

tower(三):内置中间件-限流、重试、超时与缓冲

专题: tower

Table of contents

Open Table of contents

TL;DR

tower 提供协议无关的通用中间件, 每个都通过 feature flag 独立启用. 核心六个: ConcurrencyLimit (并发数上限), RateLimit (速率限制), Timeout (超时), Retry (重试 + Policy trait), Buffer (mpsc 通道使 Service 可 Clone), LoadShed (过载丢弃). 它们与 tower-http 的 HTTP 专用中间件互补而非重复.


1. 中间件全景

tower crate 的模块结构及功能:

模块Feature Flag功能协议依赖
limit::ConcurrencyLimitlimit限制同时处理的最大请求数
limit::RateLimitlimit限制单位时间内的请求数
timeout::Timeouttimeout请求超时自动失败
retry::Retryretry按 Policy 重试失败请求
buffer::Bufferbuffer在 Service 前加 mpsc 缓冲通道
load_shed::LoadShedload-shed服务不就绪时直接拒绝
filter::Filterfilter按谓词过滤/拒绝请求
hedge::Hedgehedge预测性重试 (hedged request)
balancebalance跨多个 Service 负载均衡
loadload测量 Service 负载指标
discoverdiscover服务发现
reconnectreconnect连接断开后自动重连
steersteer请求路由到不同 Service
spawn_readyspawn_ready在后台 task 驱动 poll_ready
ready_cacheready-cache缓存 Service 的就绪状态

所有中间件都是协议无关的 — 它们不知道也不关心请求是 HTTP、gRPC 还是自定义协议.

2. ConcurrencyLimit: 并发数控制

2.1 工作原理

用 Semaphore 控制同时处理的请求数. 当并发数达到上限时, poll_ready 返回 Pending, 新请求等待直到有请求完成.

use tower::ServiceBuilder;

let service = ServiceBuilder::new()
    .concurrency_limit(10) // 最多同时处理 10 个请求
    .service(my_service);

2.2 核心类型

类型说明
ConcurrencyLimit<S>包装后的 Service, 持有 semaphore
ConcurrencyLimitLayer对应的 Layer, 构造时指定 max
GlobalConcurrencyLimitLayer多个 Service 实例共享同一个 semaphore

2.3 GlobalConcurrencyLimitLayer 的使用场景

当一个 Service 被 clone 成多份 (比如每个连接一份) 时, ConcurrencyLimitLayer 给每个 clone 一个独立的 semaphore — 10 个 clone 各限制 10 = 实际并发 100. GlobalConcurrencyLimitLayer 让所有 clone 共享一个 semaphore.

use tower::limit::GlobalConcurrencyLimitLayer;

let layer = GlobalConcurrencyLimitLayer::new(10);
// 所有通过此 layer 包装的 service (包括它们的 clone) 共享同一个 10 并发上限

2.4 背压机制

ConcurrencyLimit 的 poll_ready 是 tower 背压系统的核心消费者:

3. RateLimit: 速率限制

3.1 工作原理

滑动窗口限速: 在给定时间窗口内最多允许 N 个请求.

use tower::ServiceBuilder;
use std::time::Duration;

let service = ServiceBuilder::new()
    .rate_limit(100, Duration::from_secs(1)) // 每秒最多 100 个请求
    .service(my_service);

3.2 核心类型

类型说明
RateLimit<S>限速后的 Service
RateLimitLayer对应的 Layer, 构造参数 (num, per)

3.3 与 ConcurrencyLimit 的区别

特性ConcurrencyLimitRateLimit
控制维度同时进行的请求数单位时间内的请求数
快请求场景允许高吞吐 (请求快速完成就释放)无论快慢都限速
慢请求场景可能全部卡住仍然按速率放行
典型场景数据库连接池、线程池API 限速、防暴力破解

生产环境通常两者配合使用:

ServiceBuilder::new()
    .rate_limit(1000, Duration::from_secs(1))  // 限速
    .concurrency_limit(50)                      // 限并发
    .service(my_service);

4. Timeout: 超时控制

4.1 工作原理

为每个请求设置最大处理时间. 如果内部 Service 在指定时间内没有返回响应, 直接返回错误.

use tower::ServiceBuilder;
use std::time::Duration;

let service = ServiceBuilder::new()
    .timeout(Duration::from_secs(30))
    .service(my_service);

4.2 核心类型

类型说明
Timeout<S>包装后的 Service
TimeoutLayer对应的 Layer
timeout::error::Elapsed超时时返回的错误类型

4.3 超时与 poll_ready

Timeout 的 poll_ready 不计入超时时间. 超时只从 call 被调用开始计算. 这意味着如果请求在 poll_ready 阶段等待了很久 (因为背压), 这段时间不算超时.

这是设计意图: poll_ready 的等待是背压系统的正常行为, 不应该被超时中断.

5. Retry: 重试机制

5.1 Policy trait

Retry 的核心是 Policy trait, 它决定”这次失败要不要重试”:

// crate: tower
// 模块: tower::retry::Policy

pub trait Policy<Req, Res, E> {
    /// 重试延迟的 Future 类型
    type Future: Future<Output = ()>;

    /// 根据响应/错误决定是否重试
    ///
    /// 返回 Some(future) → 等 future 完成后重试
    /// 返回 None → 不重试, 返回当前结果
    fn retry(
        &mut self,
        req: &mut Req,
        result: &mut Result<Res, E>,
    ) -> Option<Self::Future>;

    /// 克隆请求以供重试使用
    ///
    /// 返回 None → 无法克隆, 不进行重试
    fn clone_request(&mut self, req: &Req) -> Option<Req>;
}

5.2 Policy 的设计要点

retry 方法接收 &mut Req&mut Result<Res, E>:

clone_request 必须显式克隆请求:

5.3 实现示例

use tower::retry::Policy;
use std::future;

#[derive(Clone)]
struct RetryOnServerError {
    remaining: usize,
}

impl<Req, Res, E> Policy<Req, Res, E> for RetryOnServerError
where
    Req: Clone,
    E: std::fmt::Debug,
{
    type Future = future::Ready<()>;

    fn retry(
        &mut self,
        _req: &mut Req,
        result: &mut Result<Res, E>,
    ) -> Option<Self::Future> {
        if self.remaining == 0 {
            return None; // 重试次数耗尽
        }
        match result {
            Ok(_) => None, // 成功, 不重试
            Err(e) => {
                tracing::warn!(?e, remaining = self.remaining, "retrying request");
                self.remaining -= 1;
                Some(future::ready(())) // 立即重试 (无 backoff)
            }
        }
    }

    fn clone_request(&mut self, req: &Req) -> Option<Req> {
        Some(req.clone())
    }
}

// 使用
let service = ServiceBuilder::new()
    .retry(RetryOnServerError { remaining: 3 })
    .service(my_service);

5.4 Backoff 与 Budget

tower 提供配套模块:

6. Buffer: 使 Service 可 Clone

6.1 解决的核心问题

Service trait 的 call(&mut self, ...) 需要 &mut self, 这意味着一个 Service 实例不能被多个 task 同时使用. Buffer 通过在前面放一个 mpsc channel 解决这个问题:

Task 1 ──┐                    ┌─── Service
Task 2 ──┤── mpsc channel ────┤    (单一消费者)
Task 3 ──┘   (Buffer)         └───

6.2 使用方式

let service = ServiceBuilder::new()
    .buffer(100) // 通道容量 100
    .service(my_service);

// 现在 service 可以 clone
let svc1 = service.clone();
let svc2 = service.clone();
// svc1 和 svc2 共享同一个底层 service

6.3 核心类型

类型说明
Buffer<S, Request>包装后的 Service, 实现 Clone
BufferLayer对应的 Layer
Worker (内部)后台 task, 从 channel 取请求喂给 Service

6.4 Buffer 的权衡

优势代价
Service 变成 Clone每次请求多一次 channel send/recv
天然限流 (channel 满则背压)需要 spawn 后台 Worker task
解耦请求提交与处理如果 Worker panic, 所有 Buffer clone 失效

7. 其他值得了解的中间件

7.1 LoadShed

服务不就绪时直接拒绝请求, 而不是排队等待:

let service = ServiceBuilder::new()
    .load_shed() // poll_ready 返回 Pending 时直接返回错误
    .service(my_service);

与 ConcurrencyLimit 的区别: ConcurrencyLimit 让请求等待, LoadShed 让请求立即失败. 适合”宁可快速失败也不要长时间排队”的场景.

7.2 Hedge (预测性重试)

当请求耗时超过 P50/P90 延迟时, 自动发送第二个相同请求 (hedged request), 取先返回的结果:

// hedge 会在请求可能较慢时并发发送第二个请求
// 适合幂等请求, 可以显著降低 tail latency

来源于 Google 的”The Tail at Scale”论文. 代价是增加了总请求量.

7.3 Balance (负载均衡)

跨多个 Service 实例分发请求:

// tower::balance 模块提供:
// - p2c::Balance: Power of Two Choices 负载均衡 (O(1) 选择, 接近最优)
// - pool::Pool: 连接池式负载均衡

Balance 是 poll_ready 设计的核心受益者 — 它通过探测各后端的 poll_ready 状态来评估负载.

7.4 Discover (服务发现)

与 Balance 配合, 动态发现可用的后端服务:

// tower::discover::Change 枚举
pub enum Change<K, V> {
    Insert(K, V), // 新增后端
    Remove(K),    // 移除后端
}

8. 中间件组合最佳实践

8.1 推荐的中间件顺序 (从外到内)

ServiceBuilder::new()
    // 1. 可观测性 (最外层, 看到所有请求)
    .layer(TraceLayer::new_for_http())
    // 2. 超时 (限制总处理时间)
    .timeout(Duration::from_secs(30))
    // 3. 速率限制 (防止过载)
    .rate_limit(1000, Duration::from_secs(1))
    // 4. 并发限制 (控制资源使用)
    .concurrency_limit(50)
    // 5. 重试 (只重试到达内部 service 的请求)
    .retry(my_policy)
    // 6. 实际 service
    .service(my_service)

原则:

8.2 Feature Flag 配置

# Cargo.toml
[dependencies]
tower = { version = "0.5", features = [
    "timeout",
    "limit",      # 包含 ConcurrencyLimit 和 RateLimit
    "retry",
    "buffer",
    "load-shed",
    "util",       # ServiceExt, ready(), oneshot() 等
] }

9. Pitfalls

9.1 Buffer 的 Worker panic 传播

Buffer 的后台 Worker 如果 panic, 所有 clone 的 Buffer 实例在下次 poll_ready 时会收到错误. 但这个错误不包含 panic 信息, 只是一个”service closed”错误. 调试时容易困惑.

9.2 Retry + 非幂等请求

Retry 中间件对请求调用 clone_request. 如果请求不是幂等的 (如 POST 创建订单), 重试会导致重复操作. Policy 的 clone_request 返回 None 可以阻止这种情况, 但这也意味着完全不重试.

9.3 RateLimit 的窗口对齐

RateLimit 使用固定窗口, 不是滑动窗口. 在窗口边界处, 可能在极短时间内放行双倍请求 (上一个窗口末尾 + 下一个窗口开头). 如果需要更精确的限速, 考虑使用 tower-governor.

9.4 ConcurrencyLimit + Buffer 的交互

如果 Buffer 在 ConcurrencyLimit 外面, Buffer 的容量会叠加到实际并发数上:

// 实际最大并发 = buffer_size + concurrency_limit = 110
ServiceBuilder::new()
    .buffer(100)
    .concurrency_limit(10)
    .service(svc)

如果意图是严格限制并发为 10, ConcurrencyLimit 应该在 Buffer 外面.

9.5 Timeout 不覆盖 poll_ready 等待时间

如前所述, Timeout 只计算 call 之后的时间. 如果请求在 poll_ready (背压) 阶段等了 5 分钟, Timeout 不会触发. 需要端到端超时的话, 要在应用层用 tokio::time::timeout 包裹整个 ready().await + call() 流程.


分享这篇文章:

上一篇
分布式基础(二):可扩展性-垂直扩展与水平扩展
下一篇
分布式基础(一):一致性从强一致到最终一致的权衡光谱