跳转到正文
zeno's blog
返回

tower(四):tower-http-HTTP 专用中间件

专题: tower

Table of contents

Open Table of contents

TL;DR

tower-http 提供 20+ 个 HTTP 专用中间件 (压缩、CORS、Tracing、Header 操作等), 它们构建在 tower 的泛型 Service trait 之上, 兼容任何使用 http + http-body crate 的框架 — axum、hyper、tonic 都能直接使用, 不需要每个框架重复实现.


1. tower-http 与 tower 的关系

tower 提供协议无关的通用中间件 (限流、超时、重试等), tower-http 提供HTTP 专用的中间件.

tower (协议无关)          tower-http (HTTP 专用)
├── ConcurrencyLimit     ├── Compression
├── RateLimit            ├── CORS
├── Timeout              ├── Trace
├── Retry                ├── SetHeader
├── Buffer               ├── RequestId
└── ...                  └── ...

tower-http 的中间件只要求内部 Service 操作的是 http::Request / http::Response, 不绑定具体的 HTTP 实现. 这意味着同一个 CompressionLayer 同时适用于:

2. 完整中间件清单

模块类型功能Feature Flag
add_extensionLayer/Service向每个请求的 Extensions 中注入值util
authLayer/Service认证/鉴权中间件auth
catch_panicLayer/Service将 handler panic 转换为 500 响应而非进程崩溃catch-panic
classifyTrait + 实现将响应分类为成功/失败 (供 Trace 使用)util
compressionLayer/Service压缩响应体 (gzip, br, deflate, zstd)compression-*
corsLayer/ServiceCORS 头处理cors
decompressionLayer/Service解压请求/响应体decompression-*
follow_redirectLayer/Service自动跟随 HTTP 重定向 (客户端)follow-redirect
limitLayer/Service限制请求体大小limit
map_request_bodyLayer/Service变换请求体类型util
map_response_bodyLayer/Service变换响应体类型util
metricsLayer/Service请求/响应指标采集metrics
normalize_pathLayer/Service路径标准化 (去除重复 /、尾部 /)normalize-path
on_early_dropLayer/Service检测响应 Future/Body 被提前丢弃-
propagate_headerLayer/Service将请求 header 复制到响应中propagate-header
request_idLayer/Service生成和传播请求 IDrequest-id
sensitive_headersLayer/Service标记敏感 header (日志中隐藏)sensitive-headers
set_headerLayer/Service在请求/响应上设置固定 headerset-header
set_statusLayer/Service覆盖响应状态码set-status
timeoutLayer/ServiceHTTP 请求超时 (带 408 响应)timeout
traceLayer/Service结构化日志/tracingtrace
validate_requestLayer/Service验证请求 (如 Content-Type 检查)validate-request

3. 核心中间件详解

3.1 Trace: 结构化请求日志

tower-http 最常用的中间件. 为每个请求创建 tracing span, 记录请求开始、响应完成、失败等事件.

use tower_http::trace::TraceLayer;
use tower_http::classify::StatusInRangeAsFailures;

// 基础用法: HTTP 服务器标准 trace
let layer = TraceLayer::new_for_http();

// 自定义: 将 4xx 和 5xx 都视为失败
let layer = TraceLayer::new(
    StatusInRangeAsFailures::new(400..=599).into_make_classifier()
);

TraceLayer 与 classify 模块配合:

3.2 Compression / Decompression: 压缩

use tower_http::compression::CompressionLayer;
use tower_http::decompression::DecompressionLayer;

// 服务端: 压缩响应
let server_layer = CompressionLayer::new();
// 支持算法由 feature flag 控制:
// compression-gzip, compression-br, compression-deflate, compression-zstd

// 客户端: 解压响应
let client_layer = DecompressionLayer::new();

压缩算法选择基于请求的 Accept-Encoding header, 自动协商.

Feature flag 与算法对应:

Feature Flag算法典型压缩率CPU 开销
compression-gzipgzip
compression-brBrotli中-高
compression-deflatedeflate
compression-zstdZstandard低-中

3.3 CORS: 跨域资源共享

use tower_http::cors::{CorsLayer, Any};
use http::Method;

let cors = CorsLayer::new()
    .allow_origin(Any)  // 允许所有来源 (开发环境)
    .allow_methods([Method::GET, Method::POST])
    .allow_headers(Any)
    .max_age(Duration::from_secs(3600));

CORS 中间件自动处理 preflight OPTIONS 请求, 在响应中添加必要的 CORS header.

3.4 SetHeader: Header 操作

use tower_http::set_header::SetRequestHeaderLayer;
use http::{header::USER_AGENT, HeaderValue};

// 设置请求 header (客户端中间件)
let layer = SetRequestHeaderLayer::overriding(
    USER_AGENT,
    HeaderValue::from_static("my-client/1.0")
);

// SetResponseHeaderLayer 同理, 用于响应

三种模式:

3.5 RequestId: 请求追踪

use tower_http::request_id::{
    SetRequestIdLayer, PropagateRequestIdLayer, MakeRequestUuid,
};
use http::header::HeaderName;

let x_request_id = HeaderName::from_static("x-request-id");

ServiceBuilder::new()
    .layer(SetRequestIdLayer::new(x_request_id.clone(), MakeRequestUuid))
    .layer(PropagateRequestIdLayer::new(x_request_id))
    .service(my_service);

SetRequestIdLayer 在入站请求上生成 ID, PropagateRequestIdLayer 将 ID 复制到响应 header.

3.6 CatchPanic: 防止 handler panic 崩溃进程

use tower_http::catch_panic::CatchPanicLayer;

let layer = CatchPanicLayer::new();
// handler panic 时返回 500 响应, 而不是整个 server 崩溃

底层使用 std::panic::catch_unwind, 因此要求 handler 是 UnwindSafe.

4. 完整服务端示例

use tower_http::{
    compression::CompressionLayer,
    cors::CorsLayer,
    trace::TraceLayer,
    request_id::{SetRequestIdLayer, PropagateRequestIdLayer, MakeRequestUuid},
    sensitive_headers::SetSensitiveRequestHeadersLayer,
    timeout::TimeoutLayer,
};
use tower::{ServiceBuilder, service_fn, BoxError};
use http::{Request, Response, header, HeaderName};
use std::net::{SocketAddr, Ipv6Addr};
use std::time::Duration;
use bytes::Bytes;
use http_body_util::Full;
use hyper_util::rt::{TokioIo, TokioExecutor};
use hyper_util::service::TowerToHyperService;

async fn handler(
    req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, BoxError> {
    Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
}

#[tokio::main]
async fn main() -> Result<(), BoxError> {
    tracing_subscriber::fmt::init();

    let x_request_id = HeaderName::from_static("x-request-id");

    let service = ServiceBuilder::new()
        // 敏感 header 在日志中隐藏
        .layer(SetSensitiveRequestHeadersLayer::new([
            header::AUTHORIZATION,
            header::COOKIE,
        ]))
        // 请求 ID
        .layer(SetRequestIdLayer::new(x_request_id.clone(), MakeRequestUuid))
        .layer(PropagateRequestIdLayer::new(x_request_id))
        // Tracing
        .layer(TraceLayer::new_for_http())
        // CORS
        .layer(CorsLayer::permissive())
        // 超时
        .layer(TimeoutLayer::new(Duration::from_secs(30)))
        // 压缩
        .layer(CompressionLayer::new())
        // 实际 handler
        .service_fn(handler);

    let addr = SocketAddr::from((Ipv6Addr::LOCALHOST, 3000));
    let listener = tokio::net::TcpListener::bind(addr).await?;
    tracing::info!("listening on {}", addr);

    loop {
        let (socket, _) = listener.accept().await?;
        let service = service.clone();
        tokio::spawn(async move {
            let socket = TokioIo::new(socket);
            let hyper_service = TowerToHyperService::new(service);
            if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
                .serve_connection(socket, hyper_service)
                .await
            {
                eprintln!("failed to serve connection: {err:#}");
            }
        });
    }
}

5. 完整客户端示例

use tower_http::{
    decompression::DecompressionLayer,
    set_header::SetRequestHeaderLayer,
    trace::TraceLayer,
    classify::StatusInRangeAsFailures,
};
use tower::{ServiceBuilder, Service, ServiceExt};
use hyper_util::{rt::TokioExecutor, client::legacy::Client};
use http_body_util::Full;
use bytes::Bytes;
use http::{Request, HeaderValue, header::USER_AGENT};

#[tokio::main]
async fn main() {
    let client = Client::builder(TokioExecutor::new()).build_http();

    let mut client = ServiceBuilder::new()
        .layer(TraceLayer::new(
            StatusInRangeAsFailures::new(400..=599).into_make_classifier()
        ))
        .layer(SetRequestHeaderLayer::overriding(
            USER_AGENT,
            HeaderValue::from_static("my-app/1.0")
        ))
        .layer(DecompressionLayer::new())
        .service(client);

    let request = Request::builder()
        .uri("http://example.com")
        .body(Full::<Bytes>::default())
        .unwrap();

    let response = client
        .ready()
        .await
        .unwrap()
        .call(request)
        .await
        .unwrap();
}

6. tower-http 与框架的关系

                    tower-http (HTTP 中间件)
                   /          |          \
                  /           |           \
               axum         hyper       tonic
            (Web 框架)   (HTTP 实现)   (gRPC 框架)

三者都使用 http::Request / http::Response + http_body::Body, 因此 tower-http 中间件可以无修改地应用于任何一个. 这是 tower 生态系统的核心价值: 写一次中间件, 到处使用.

7. Cargo.toml 配置

[dependencies]
tower-http = { version = "0.6", features = [
    "trace",
    "compression-gzip",
    "compression-br",
    "cors",
    "set-header",
    "request-id",
    "sensitive-headers",
    "timeout",
    "catch-panic",
    "util",             # ServiceBuilderExt 等便捷方法
] }

# 通常还需要
tower = { version = "0.5", features = ["util"] }
http = "1"
http-body = "1"
http-body-util = "0.1"

8. Pitfalls

8.1 Compression 与 streaming 响应

压缩中间件需要缓冲数据才能压缩. 对于 SSE (Server-Sent Events) 或 WebSocket 等 streaming 响应, 压缩会增加延迟或完全不起作用. 可以用 CompressionLayer::new().no_gzip().no_br() 禁用特定算法, 或在 handler 层面对 streaming 响应跳过压缩.

8.2 CORS 的 preflight 缓存

max_age 设置过短会导致浏览器频繁发送 OPTIONS preflight 请求 (每个跨域请求前一次). 生产环境建议设置 max_age(Duration::from_secs(86400)) (24 小时).

8.3 TraceLayer 的 span 级别

默认的 TraceLayer::new_for_http() 使用 DEBUG 级别. 生产环境中如果 tracing subscriber 设置为 INFO, 这些 span 不会出现. 需要明确配置:

use tower_http::trace::TraceLayer;
use tracing::Level;

let layer = TraceLayer::new_for_http()
    .make_span_with(|req: &Request<_>| {
        tracing::info_span!("http_request",
            method = %req.method(),
            uri = %req.uri(),
        )
    });

8.4 tower-http::timeout vs tower::timeout

两者不同:

在 HTTP 服务中, 优先用 tower_http::timeout 以获得正确的 HTTP 语义.

8.5 Feature flag 遗漏

tower-http 默认不启用任何 feature. 忘记添加 feature flag 会导致编译错误 (找不到模块/类型), 而非运行时错误. 建议在项目初始化时就列出所有需要的 feature.


分享这篇文章:

上一篇
分布式基础(三):高可用-通过冗余和自动故障转移消除单点故障
下一篇
分布式基础(二):可扩展性-垂直扩展与水平扩展