跳转到正文
zeno's blog
返回

tower(二):Layer 与 ServiceBuilder-中间件组合机制

专题: tower

Table of contents

Open Table of contents

TL;DR

Layer trait 定义了”如何把一个 Service 包装成另一个 Service”, ServiceBuilder 提供声明式 API 将多个 Layer 按顺序堆叠. 中间件组合在编译期完成类型嵌套 (如 Timeout<RateLimit<ConcurrencyLimit<S>>>), 零运行时开销, 这是 tower 与 Express/Koa 等运行时中间件链的本质区别.


1. Layer trait: 中间件的工厂

1.1 完整定义

// crate: tower-layer
// 模块: tower_layer

pub trait Layer<S> {
    /// 包装后产生的 Service 类型
    type Service;

    /// 将 inner service 包装成新的 service
    fn layer(&self, inner: S) -> Self::Service;
}

Layer 不是 Service. Layer 是 Service 的工厂 — 它接收一个 Service, 返回一个新的 (包装过的) Service.

1.2 Layer 与 Service 的关系

Layer::layer(inner_service) → WrappedService
                                    |

                          impl Service<Request>

一个具体的例子 — 日志中间件:

use tower::{Layer, Service};
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;

// Step 1: 定义 Layer (工厂)
pub struct LogLayer {
    target: &'static str,
}

impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        LogService {
            target: self.target,
            inner,
        }
    }
}

// Step 2: 定义包装后的 Service
pub struct LogService<S> {
    target: &'static str,
    inner: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: std::fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx) // 透传背压信号
    }

    fn call(&mut self, req: Request) -> Self::Future {
        tracing::debug!(target: self.target, ?req, "processing request");
        self.inner.call(req) // 委托给内部 service
    }
}

关键模式: Layer 持有配置 (如 target), Service 持有内部 Service 引用 + 配置. 这种分离允许同一个 Layer 实例应用到不同的 Service 上.

2. 洋葱模型

2.1 中间件堆叠的类型结构

当多个 Layer 依次应用时, 产生嵌套类型:

use tower::timeout::TimeoutLayer;
use tower::limit::{RateLimitLayer, ConcurrencyLimitLayer};
use std::time::Duration;

// 手动应用 Layer
let service = my_service;
let service = ConcurrencyLimitLayer::new(10).layer(service);
let service = RateLimitLayer::new(100, Duration::from_secs(1)).layer(service);
let service = TimeoutLayer::new(Duration::from_secs(30)).layer(service);

// 最终类型: Timeout<RateLimit<ConcurrencyLimit<MyService>>>

请求的流经路径 (洋葱模型):

Request →  Timeout  →  RateLimit  →  ConcurrencyLimit  →  MyService
                                                              |
Response ←  Timeout  ←  RateLimit  ←  ConcurrencyLimit  ←────┘

外层中间件先看到请求, 最后看到响应. 这与 Express.js 的 next() 模型语义相同, 但实现方式完全不同:

特性tower (Rust)Express.js
中间件链表示编译期类型嵌套运行时数组
调度开销零 (单态化内联)每层一次虚函数调用
类型安全编译期检查 Request/Response 类型无 (any)
添加/移除中间件需要重新编译运行时动态
错误类型每层可以有不同的错误类型, 编译期组合统一 Error

2.2 洋葱的方向: 最先添加的 Layer 最靠外

这是一个关键的认知点. 用 ServiceBuilder 时:

ServiceBuilder::new()
    .layer(A)  // 最外层, 最先处理请求
    .layer(B)  // 中间层
    .layer(C)  // 最内层, 最接近实际 service
    .service(my_service)

等价于:

A.layer(B.layer(C.layer(my_service)))

类型是 A::Service<B::Service<C::Service<MyService>>>. 请求先经过 A, 再经过 B, 最后经过 C, 到达 MyService.

3. ServiceBuilder: 声明式中间件组合

3.1 核心 API

use tower::ServiceBuilder;
use std::time::Duration;

let service = ServiceBuilder::new()
    // 请求入队缓冲 (允许 Service 不实现 Clone)
    .buffer(100)
    // 并发限制
    .concurrency_limit(10)
    // 限速: 每秒 100 个请求
    .rate_limit(100, Duration::from_secs(1))
    // 超时
    .timeout(Duration::from_secs(30))
    // 自定义 Layer
    .layer(MyCustomLayer::new())
    // 应用到具体 service
    .service(my_service);

3.2 完整方法列表

内置中间件快捷方法 (需要对应 feature flag):

方法Feature功能
.buffer(bound)buffer在 service 前加 mpsc 缓冲通道
.concurrency_limit(max)limit限制最大并发请求数
.rate_limit(num, per)limit限制请求速率
.timeout(duration)timeout请求超时
.retry(policy)retry按策略重试失败请求
.load_shed()load-shed服务不就绪时直接拒绝请求
.filter(predicate)filter按条件过滤请求

变换方法:

方法功能
.map_request(f)变换请求类型
.map_response(f)变换响应类型
.map_err(f)变换错误类型
.map_future(f)变换返回的 Future
.then(f)在响应后应用异步函数
.and_then(f)在响应后链式异步操作

核心方法:

方法功能
.layer(l)添加自定义 Layer
.service(s)用累积的 Layer 包装 Service
.service_fn(f)用累积的 Layer 包装 async fn
.into_inner()提取底层 Layer
.boxed()类型擦除 (返回 BoxService)
.boxed_clone()类型擦除 + Clone (返回 BoxCloneService)

3.3 Layer 应用顺序的实际影响

顺序不同, 行为不同:

// 方案 A: buffer 在 concurrency_limit 外面
ServiceBuilder::new()
    .buffer(100)           // 允许 100 个请求排队
    .concurrency_limit(10) // 最多同时处理 10 个
    .service(svc)
// → 最多 110 个请求在系统中 (100 排队 + 10 处理中)

// 方案 B: buffer 在 concurrency_limit 里面
ServiceBuilder::new()
    .concurrency_limit(10) // 最多 10 个请求进入
    .buffer(100)           // 100 的缓冲在限制之内
    .service(svc)
// → 最多 10 个请求在系统中 (缓冲区被限制保护)

这不是 bug, 是 feature. 开发者对中间件的位置有完全控制权.

4. 高级用法

4.1 Layer 组合 (元组)

多个 Layer 可以组合成一个元组 Layer:

use tower::layer::util::Stack;

// ServiceBuilder 内部使用 Stack 类型嵌套 Layer
// Stack<TimeoutLayer, Stack<RateLimitLayer, Stack<ConcurrencyLimitLayer, Identity>>>

tower 为最多 16 个元素的元组实现了 Layer, 超过 16 层需要手动分组或使用 ServiceBuilder.

4.2 类型擦除: BoxService 和 BoxCloneService

当中间件堆叠后的类型过于复杂, 可以用类型擦除:

use tower::BoxService;

// 不擦除: Timeout<RateLimit<ConcurrencyLimit<MyService>>>
// 擦除后: BoxService<Request, Response, Error>

let service: BoxService<Request, Response, Box<dyn std::error::Error + Send + Sync>> =
    ServiceBuilder::new()
        .timeout(Duration::from_secs(30))
        .rate_limit(100, Duration::from_secs(1))
        .concurrency_limit(10)
        .boxed()
        .service(my_service);

类型擦除的代价: 每次 call 变成虚函数调用 (vtable), 失去内联优化. 在大多数场景下这个开销可忽略, 但在极高性能路径上需要权衡.

4.3 ServiceBuilder 本身实现 Layer

ServiceBuilder 实现了 Layer<S> trait, 这意味着它可以嵌套在其他 ServiceBuilder 中:

let inner_stack = ServiceBuilder::new()
    .concurrency_limit(10)
    .rate_limit(100, Duration::from_secs(1));

let full_stack = ServiceBuilder::new()
    .timeout(Duration::from_secs(30))
    .layer(inner_stack) // ServiceBuilder 作为 Layer 使用
    .service(my_service);

5. Pitfalls

5.1 Layer 顺序与直觉相反

.layer(A).layer(B).service(svc) 的请求流是 A → B → svc, 但代码阅读顺序是从上到下. 初学者常以为 B 在 A 前面. 理解方式: 先写的先执行 (与 Express app.use() 一致).

5.2 泛型爆炸

三四层中间件后, 完整类型可能长达数百字符. 编译错误信息极难阅读. 应对:

5.3 Layer 的 Clone 语义

Layer 通常需要 Clone, 因为它可能被多次使用 (比如用同一个 Layer 包装多个不同的 Service). 如果你的自定义 Layer 持有不可 Clone 的状态, 要用 Arc 包装.

5.4 poll_ready 的传播义务

实现包装 Service 时, 必须在 poll_ready 中调用内部 service 的 poll_ready. 忘记传播背压信号是常见 bug:

// 错误: 丢失了背压传播
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(())) // 总是就绪? 那内部 service 的背压去哪了?
}

// 正确: 传播内部 service 的就绪状态
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    self.inner.poll_ready(cx)
}

5.5 混淆 Layer 和 Service

Layer 不处理请求, Service 才处理. Layer 是配置时概念 (应用启动时组装), Service 是运行时概念 (每个请求经过). 不要在 Layer 中放运行时逻辑.


分享这篇文章:

上一篇
分布式基础(一):一致性从强一致到最终一致的权衡光谱
下一篇
tower(一):Service trait-异步函数抽象与背压信号