Table of contents
Open Table of contents
TL;DR
http_body::Body trait 是 hyper 生态中所有 HTTP body 的统一抽象, 通过 poll_frame 方法按帧返回数据 (data frames) 或 trailers, 实现零拷贝流式传输. hyper 1.0 将入站 body 固定为 Incoming 类型, 出站 body 由开发者从 Full<Bytes>, Empty<Bytes>, BoxBody 等中选择 — 这种入站/出站类型分离是从 0.14 迁移时最大的心智模型变化.
1. Body trait 定义
Body trait 定义在独立的 http-body crate 中 (版本 1.0):
// crate: http-body 1.0
// 模块: http_body
pub trait Body {
/// body 中每个数据帧的类型, 必须实现 bytes::Buf
type Data: Buf;
/// 可能产生的错误类型
type Error;
/// 异步拉取下一个帧 (data 或 trailers)
///
/// 返回值:
/// - Poll::Ready(Some(Ok(frame))) → 有新的帧
/// - Poll::Ready(None) → body 结束
/// - Poll::Ready(Some(Err(e))) → 出错
/// - Poll::Pending → 数据还没准备好
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;
/// body 是否已经结束 (提示性, 非强制)
///
/// 返回 true → poll_frame 保证返回 None
/// 返回 false → 不保证有更多数据, 也不保证没有
fn is_end_stream(&self) -> bool {
false
}
/// body 剩余数据的大小提示
///
/// lower == upper 时表示精确大小 (用于设置 Content-Length)
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
1.1 Frame: data 与 trailers 的统一表示
// http-body::Frame
pub struct Frame<T> { ... }
impl<T> Frame<T> {
pub fn data(data: T) -> Self; // 创建 data frame
pub fn trailers(trailers: HeaderMap) -> Self; // 创建 trailers frame
pub fn is_data(&self) -> bool;
pub fn is_trailers(&self) -> bool;
pub fn into_data(self) -> Result<T, Self>;
pub fn into_trailers(self) -> Result<HeaderMap, Self>;
pub fn data_ref(&self) -> Option<&T>;
pub fn trailers_ref(&self) -> Option<&HeaderMap>;
}
HTTP body 不仅包含数据, 还可能包含 trailers (HTTP/2 和 chunked transfer encoding 支持). Frame 将两者统一为一种类型.
2. hyper 0.14 vs 1.0 的 Body 类型变化
2.1 hyper 0.14: 单一 hyper::Body 类型
// 0.14: 一个类型通吃
use hyper::Body;
// 入站: 从网络接收
async fn handler(req: Request<Body>) -> Result<Response<Body>, Error> {
let body_bytes = hyper::body::to_bytes(req.body()).await?;
Ok(Response::new(Body::from("hello"))) // 出站: 字符串转 Body
}
hyper::Body 同时是:
- 入站流 (从 socket 读取的数据)
- 出站容器 (从 String, Vec
, &‘static str 等构造) - Stream 适配器 (实现了
Stream<Item = Result<Bytes, Error>>)
2.2 hyper 1.0: 入站/出站分离
入站 body: hyper::body::Incoming
// 1.0: 入站请求的 body 类型是 Incoming
async fn handler(
req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
// Incoming 只能由 hyper 内部创建, 开发者无法构造
// 它实现了 Body trait, 可以被 poll_frame 消费
Ok(Response::new(Full::new(Bytes::from("hello"))))
}
Incoming 的特点:
- 只能从网络接收, 不能手动构造
- 实现
Body<Data = Bytes, Error = hyper::Error> - 表示一个正在从网络流入的 body
出站 body: 开发者选择
| 类型 | 来源 crate | 用途 | 适用场景 |
|---|---|---|---|
Full<Bytes> | http-body-util | 完整的、已知大小的 body | 小响应, API JSON |
Empty<Bytes> | http-body-util | 空 body | 204 No Content, HEAD 响应 |
BoxBody<Bytes, E> | http-body-util | 类型擦除的 body | 需要统一多种 body 类型时 |
StreamBody<S> | http-body-util | 从 Stream 构造 | SSE, 大文件流 |
| 自定义类型 | - | 实现 Body trait | 高性能场景, 零拷贝需求 |
2.3 常用出站 body 构造
use http_body_util::{Full, Empty, BodyExt};
use bytes::Bytes;
// 从字符串
let body = Full::new(Bytes::from("hello world"));
// 空 body
let body = Empty::<Bytes>::new();
// 从 JSON
let json = serde_json::to_vec(&my_data)?;
let body = Full::new(Bytes::from(json));
// 类型擦除 (当需要统一类型时)
use http_body_util::combinators::BoxBody;
fn make_body(data: Option<String>) -> BoxBody<Bytes, hyper::Error> {
match data {
Some(s) => Full::new(Bytes::from(s))
.map_err(|never| match never {})
.boxed(),
None => Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed(),
}
}
3. http-body-util: Body 的实用工具
http-body-util crate 提供操作 Body 的实用方法:
3.1 BodyExt trait
use http_body_util::BodyExt;
// 将整个 body 收集为 Bytes
let body = req.into_body();
let collected = body.collect().await?.to_bytes();
// 转换为 BoxBody (类型擦除)
let boxed = body.boxed();
// 对 body 做 map
let mapped = body.map_frame(|frame| {
// 变换每个帧
frame
});
// 错误类型转换
let body = body.map_err(|e| -> MyError { e.into() });
3.2 collect() 与 Collected
use http_body_util::{BodyExt, Collected};
// collect() 将流式 body 收集为 Collected<Bytes>
let collected: Collected<Bytes> = body.collect().await?;
// Collected 提供多种访问方式
let bytes: Bytes = collected.to_bytes(); // 拼接为连续 Bytes
let trailers: Option<HeaderMap> = collected.trailers(); // 获取 trailers
4. Body 的流式设计哲学
4.1 为什么不直接用 Vec<u8>
Vec<u8>: [==========完整 body==========]
↑ 必须全部读入内存
Body trait: [=chunk1=] → [=chunk2=] → [=chunk3=] → None
↑ 每次只处理一个 chunk, 内存使用恒定
流式 body 的优势:
- 内存效率: 传输 1GB 文件不需要 1GB 内存
- 背压: 如果消费者处理慢,
poll_frame返回Pending, 上游停止读取, TCP 接收窗口缩小, 对端降速 - 低延迟: 第一个 chunk 准备好就可以开始处理, 不需要等全部接收完
4.2 Frame 而非 Chunk
http-body 0.4 使用 poll_data() + poll_trailers() 两个方法. 1.0 合并为 poll_frame() + Frame 枚举, 因为:
- 简化状态机 (一个方法 vs 两个方法的调用顺序问题)
- 支持未来可能的新帧类型 (如 HTTP/3 的元数据帧)
5. Body 在框架中的类型对应
| 框架/库 | 入站 Body | 出站 Body | 说明 |
|---|---|---|---|
| hyper 1.0 | Incoming | 泛型 B: Body | 开发者选择出站类型 |
| hyper 0.14 | hyper::Body | hyper::Body | 同一类型 |
| axum | axum::body::Body (= http_body_util::combinators::UnsyncBoxBody) | axum::body::Body | 统一的类型擦除 body |
| reqwest | - | - | 内部处理, 用户面对 reqwest::Body |
| tonic | tonic::body::BoxBody | tonic::body::BoxBody | gRPC body |
5.1 axum 的 body 类型演变
axum 0.6 (基于 hyper 0.14):
type Body = hyper::Body;
axum 0.7+ (基于 hyper 1.0):
// axum::body::Body 是一个类型别名, 底层使用 http-body-util 的 BoxBody
// 具体定义随版本可能变化, 但核心是: 类型擦除的 Body
pub type Body = /* ... */;
axum handler 自动将 impl IntoResponse 转换为框架的 body 类型, 开发者通常不需要直接操作 Body.
6. 自定义 Body 实现
当需要极致性能或特殊行为时, 可以自定义 Body:
use http_body::{Body, Frame, SizeHint};
use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};
/// 返回固定字符串的零分配 Body
struct StaticBody {
data: Option<&'static [u8]>,
}
impl StaticBody {
fn new(data: &'static [u8]) -> Self {
Self { data: Some(data) }
}
}
impl Body for StaticBody {
type Data = Bytes;
type Error = std::convert::Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.data.take() {
Some(data) => Poll::Ready(Some(Ok(
Frame::data(Bytes::from_static(data))
))),
None => Poll::Ready(None),
}
}
fn is_end_stream(&self) -> bool {
self.data.is_none()
}
fn size_hint(&self) -> SizeHint {
match self.data {
Some(data) => SizeHint::with_exact(data.len() as u64),
None => SizeHint::with_exact(0),
}
}
}
7. Pitfalls
7.1 collect() 大 body 导致 OOM
body.collect().await?.to_bytes() 将整个 body 加载到内存. 对于大文件上传/下载, 应逐 frame 处理:
// 错误: 大文件 OOM
let all_bytes = body.collect().await?.to_bytes();
// 正确: 流式处理
while let Some(frame) = body.frame().await {
let frame = frame?;
if let Some(data) = frame.data_ref() {
// 逐块写入文件/处理
file.write_all(data.chunk()).await?;
}
}
7.2 Full 的 Error 类型是 Infallible
Full<Bytes> 永远不会出错, 所以它的 Error 类型是 std::convert::Infallible. 但如果函数签名要求 Body<Error = hyper::Error>, 需要做类型转换:
// 编译错误: Infallible != hyper::Error
fn handler() -> Response<Full<Bytes>> { ... }
// 需要 map_err
Full::new(Bytes::from("hello"))
.map_err(|never| match never {}) // Infallible 永远不会被调用
7.3 Incoming 不能用于出站
Incoming 只表示从网络接收的数据, 不能用来构造出站响应:
// 编译错误: 无法构造 Incoming
let body = hyper::body::Incoming::new(...); // 没有 public 构造函数
7.4 Body 的 Send + Sync 约束
在多线程运行时 (tokio) 中, handler 返回的 Future 必须 Send. 这要求 body 类型也是 Send. UnsyncBoxBody 不是 Send — 在需要跨线程传递时要用 BoxBody.
7.5 忘记处理 trailers
poll_frame 可能返回 trailers frame. 如果只处理 data frame 而忽略 trailers, 在 gRPC (依赖 trailers 传递状态码) 场景下会出问题.