跳转到正文
zeno's blog
返回

hyper(二):Body trait 与请求响应体

专题: hyper

Table of contents

Open Table of contents

TL;DR

http_body::Body trait 是 hyper 生态中所有 HTTP body 的统一抽象, 通过 poll_frame 方法按帧返回数据 (data frames) 或 trailers, 实现零拷贝流式传输. hyper 1.0 将入站 body 固定为 Incoming 类型, 出站 body 由开发者从 Full<Bytes>, Empty<Bytes>, BoxBody 等中选择 — 这种入站/出站类型分离是从 0.14 迁移时最大的心智模型变化.


1. Body trait 定义

Body trait 定义在独立的 http-body crate 中 (版本 1.0):

// crate: http-body 1.0
// 模块: http_body

pub trait Body {
    /// body 中每个数据帧的类型, 必须实现 bytes::Buf
    type Data: Buf;

    /// 可能产生的错误类型
    type Error;

    /// 异步拉取下一个帧 (data 或 trailers)
    ///
    /// 返回值:
    /// - Poll::Ready(Some(Ok(frame))) → 有新的帧
    /// - Poll::Ready(None) → body 结束
    /// - Poll::Ready(Some(Err(e))) → 出错
    /// - Poll::Pending → 数据还没准备好
    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

    /// body 是否已经结束 (提示性, 非强制)
    ///
    /// 返回 true → poll_frame 保证返回 None
    /// 返回 false → 不保证有更多数据, 也不保证没有
    fn is_end_stream(&self) -> bool {
        false
    }

    /// body 剩余数据的大小提示
    ///
    /// lower == upper 时表示精确大小 (用于设置 Content-Length)
    fn size_hint(&self) -> SizeHint {
        SizeHint::default()
    }
}

1.1 Frame: data 与 trailers 的统一表示

// http-body::Frame
pub struct Frame<T> { ... }

impl<T> Frame<T> {
    pub fn data(data: T) -> Self;        // 创建 data frame
    pub fn trailers(trailers: HeaderMap) -> Self; // 创建 trailers frame

    pub fn is_data(&self) -> bool;
    pub fn is_trailers(&self) -> bool;
    pub fn into_data(self) -> Result<T, Self>;
    pub fn into_trailers(self) -> Result<HeaderMap, Self>;
    pub fn data_ref(&self) -> Option<&T>;
    pub fn trailers_ref(&self) -> Option<&HeaderMap>;
}

HTTP body 不仅包含数据, 还可能包含 trailers (HTTP/2 和 chunked transfer encoding 支持). Frame 将两者统一为一种类型.

2. hyper 0.14 vs 1.0 的 Body 类型变化

2.1 hyper 0.14: 单一 hyper::Body 类型

// 0.14: 一个类型通吃
use hyper::Body;

// 入站: 从网络接收
async fn handler(req: Request<Body>) -> Result<Response<Body>, Error> {
    let body_bytes = hyper::body::to_bytes(req.body()).await?;
    Ok(Response::new(Body::from("hello"))) // 出站: 字符串转 Body
}

hyper::Body 同时是:

2.2 hyper 1.0: 入站/出站分离

入站 body: hyper::body::Incoming

// 1.0: 入站请求的 body 类型是 Incoming
async fn handler(
    req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
    // Incoming 只能由 hyper 内部创建, 开发者无法构造
    // 它实现了 Body trait, 可以被 poll_frame 消费
    Ok(Response::new(Full::new(Bytes::from("hello"))))
}

Incoming 的特点:

出站 body: 开发者选择

类型来源 crate用途适用场景
Full<Bytes>http-body-util完整的、已知大小的 body小响应, API JSON
Empty<Bytes>http-body-util空 body204 No Content, HEAD 响应
BoxBody<Bytes, E>http-body-util类型擦除的 body需要统一多种 body 类型时
StreamBody<S>http-body-util从 Stream 构造SSE, 大文件流
自定义类型-实现 Body trait高性能场景, 零拷贝需求

2.3 常用出站 body 构造

use http_body_util::{Full, Empty, BodyExt};
use bytes::Bytes;

// 从字符串
let body = Full::new(Bytes::from("hello world"));

// 空 body
let body = Empty::<Bytes>::new();

// 从 JSON
let json = serde_json::to_vec(&my_data)?;
let body = Full::new(Bytes::from(json));

// 类型擦除 (当需要统一类型时)
use http_body_util::combinators::BoxBody;

fn make_body(data: Option<String>) -> BoxBody<Bytes, hyper::Error> {
    match data {
        Some(s) => Full::new(Bytes::from(s))
            .map_err(|never| match never {})
            .boxed(),
        None => Empty::<Bytes>::new()
            .map_err(|never| match never {})
            .boxed(),
    }
}

3. http-body-util: Body 的实用工具

http-body-util crate 提供操作 Body 的实用方法:

3.1 BodyExt trait

use http_body_util::BodyExt;

// 将整个 body 收集为 Bytes
let body = req.into_body();
let collected = body.collect().await?.to_bytes();

// 转换为 BoxBody (类型擦除)
let boxed = body.boxed();

// 对 body 做 map
let mapped = body.map_frame(|frame| {
    // 变换每个帧
    frame
});

// 错误类型转换
let body = body.map_err(|e| -> MyError { e.into() });

3.2 collect() 与 Collected

use http_body_util::{BodyExt, Collected};

// collect() 将流式 body 收集为 Collected<Bytes>
let collected: Collected<Bytes> = body.collect().await?;

// Collected 提供多种访问方式
let bytes: Bytes = collected.to_bytes();       // 拼接为连续 Bytes
let trailers: Option<HeaderMap> = collected.trailers(); // 获取 trailers

4. Body 的流式设计哲学

4.1 为什么不直接用 Vec<u8>

Vec<u8>:     [==========完整 body==========]
             ↑ 必须全部读入内存

Body trait:  [=chunk1=] → [=chunk2=] → [=chunk3=] → None
             ↑ 每次只处理一个 chunk, 内存使用恒定

流式 body 的优势:

4.2 Frame 而非 Chunk

http-body 0.4 使用 poll_data() + poll_trailers() 两个方法. 1.0 合并为 poll_frame() + Frame 枚举, 因为:

5. Body 在框架中的类型对应

框架/库入站 Body出站 Body说明
hyper 1.0Incoming泛型 B: Body开发者选择出站类型
hyper 0.14hyper::Bodyhyper::Body同一类型
axumaxum::body::Body (= http_body_util::combinators::UnsyncBoxBody)axum::body::Body统一的类型擦除 body
reqwest--内部处理, 用户面对 reqwest::Body
tonictonic::body::BoxBodytonic::body::BoxBodygRPC body

5.1 axum 的 body 类型演变

axum 0.6 (基于 hyper 0.14):

type Body = hyper::Body;

axum 0.7+ (基于 hyper 1.0):

// axum::body::Body 是一个类型别名, 底层使用 http-body-util 的 BoxBody
// 具体定义随版本可能变化, 但核心是: 类型擦除的 Body
pub type Body = /* ... */;

axum handler 自动将 impl IntoResponse 转换为框架的 body 类型, 开发者通常不需要直接操作 Body.

6. 自定义 Body 实现

当需要极致性能或特殊行为时, 可以自定义 Body:

use http_body::{Body, Frame, SizeHint};
use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};

/// 返回固定字符串的零分配 Body
struct StaticBody {
    data: Option<&'static [u8]>,
}

impl StaticBody {
    fn new(data: &'static [u8]) -> Self {
        Self { data: Some(data) }
    }
}

impl Body for StaticBody {
    type Data = Bytes;
    type Error = std::convert::Infallible;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        match self.data.take() {
            Some(data) => Poll::Ready(Some(Ok(
                Frame::data(Bytes::from_static(data))
            ))),
            None => Poll::Ready(None),
        }
    }

    fn is_end_stream(&self) -> bool {
        self.data.is_none()
    }

    fn size_hint(&self) -> SizeHint {
        match self.data {
            Some(data) => SizeHint::with_exact(data.len() as u64),
            None => SizeHint::with_exact(0),
        }
    }
}

7. Pitfalls

7.1 collect() 大 body 导致 OOM

body.collect().await?.to_bytes() 将整个 body 加载到内存. 对于大文件上传/下载, 应逐 frame 处理:

// 错误: 大文件 OOM
let all_bytes = body.collect().await?.to_bytes();

// 正确: 流式处理
while let Some(frame) = body.frame().await {
    let frame = frame?;
    if let Some(data) = frame.data_ref() {
        // 逐块写入文件/处理
        file.write_all(data.chunk()).await?;
    }
}

7.2 Full 的 Error 类型是 Infallible

Full<Bytes> 永远不会出错, 所以它的 Error 类型是 std::convert::Infallible. 但如果函数签名要求 Body<Error = hyper::Error>, 需要做类型转换:

// 编译错误: Infallible != hyper::Error
fn handler() -> Response<Full<Bytes>> { ... }
// 需要 map_err
Full::new(Bytes::from("hello"))
    .map_err(|never| match never {}) // Infallible 永远不会被调用

7.3 Incoming 不能用于出站

Incoming 只表示从网络接收的数据, 不能用来构造出站响应:

// 编译错误: 无法构造 Incoming
let body = hyper::body::Incoming::new(...); // 没有 public 构造函数

7.4 Body 的 Send + Sync 约束

在多线程运行时 (tokio) 中, handler 返回的 Future 必须 Send. 这要求 body 类型也是 Send. UnsyncBoxBody 不是 Send — 在需要跨线程传递时要用 BoxBody.

7.5 忘记处理 trailers

poll_frame 可能返回 trailers frame. 如果只处理 data frame 而忽略 trailers, 在 gRPC (依赖 trailers 传递状态码) 场景下会出问题.


分享这篇文章:

上一篇
hyper(三):与 axum、tower 的架构关系
下一篇
hyper(一):底层 HTTP 实现与 1.0 迁移