From 2f95afae07eeed95eb075696fd2fbe8e5319df5b Mon Sep 17 00:00:00 2001 From: Mohamed Daahir Date: Sun, 19 Oct 2025 22:07:44 +0100 Subject: [PATCH] refactor: use decompression from http-tower --- Cargo.toml | 9 +- src/async_impl/body.rs | 6 +- src/async_impl/client.rs | 113 +++++- src/async_impl/decoder.rs | 746 ------------------------------------- src/async_impl/mod.rs | 4 - src/async_impl/response.rs | 26 +- src/blocking/response.rs | 16 +- src/error.rs | 11 - tests/brotli.rs | 1 + tests/deflate.rs | 1 + tests/gzip.rs | 1 + tests/zstd.rs | 1 + 12 files changed, 124 insertions(+), 811 deletions(-) delete mode 100644 src/async_impl/decoder.rs diff --git a/Cargo.toml b/Cargo.toml index 69437f0c2..ada29a5a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,13 +64,13 @@ charset = ["dep:encoding_rs", "dep:mime"] cookies = ["dep:cookie_crate", "dep:cookie_store"] -gzip = ["dep:async-compression", "async-compression?/gzip", "dep:futures-util", "dep:tokio-util"] +gzip = ["tower-http/decompression-gzip"] -brotli = ["dep:async-compression", "async-compression?/brotli", "dep:futures-util", "dep:tokio-util"] +brotli = ["tower-http/decompression-br"] -zstd = ["dep:async-compression", "async-compression?/zstd", "dep:futures-util", "dep:tokio-util"] +zstd = ["tower-http/decompression-zstd"] -deflate = ["dep:async-compression", "async-compression?/zlib", "dep:futures-util", "dep:tokio-util"] +deflate = ["tower-http/decompression-deflate"] json = ["dep:serde_json"] @@ -159,7 +159,6 @@ cookie_crate = { version = "0.18.0", package = "cookie", optional = true } cookie_store = { version = "0.21.0", optional = true } ## compression -async-compression = { version = "0.4.0", default-features = false, features = ["tokio"], optional = true } tokio-util = { version = "0.7.9", default-features = false, features = ["codec", "io"], optional = true } ## hickory-dns diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 2a7d8f821..3a52fde17 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -47,7 +47,7 @@ pin_project! { } /// Converts any `impl Body` into a `impl Stream` of just its DATA frames. -#[cfg(any(feature = "stream", feature = "multipart",))] +#[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))] pub(crate) struct DataStream(pub(crate) B); impl Body { @@ -161,7 +161,7 @@ impl Body { } } - #[cfg(feature = "multipart")] + #[cfg(any(feature = "multipart", feature = "blocking"))] pub(crate) fn into_stream(self) -> DataStream { DataStream(self) } @@ -423,7 +423,7 @@ where // ===== impl DataStream ===== -#[cfg(any(feature = "stream", feature = "multipart",))] +#[cfg(any(feature = "stream", feature = "multipart", feature = "blocking",))] impl futures_core::Stream for DataStream where B: HttpBody + Unpin, diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 8f3bebb1e..28d16a6b4 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -9,7 +9,6 @@ use std::time::Duration; use std::{collections::HashMap, convert::TryInto, net::SocketAddr}; use std::{fmt, str}; -use super::decoder::Accepts; use super::request::{Request, RequestBuilder}; use super::response::Response; use super::Body; @@ -45,9 +44,7 @@ use crate::Certificate; use crate::Identity; use crate::{IntoUrl, Method, Proxy, Url}; -use http::header::{ - Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, PROXY_AUTHORIZATION, RANGE, USER_AGENT, -}; +use http::header::{Entry, HeaderMap, HeaderValue, ACCEPT, PROXY_AUTHORIZATION, USER_AGENT}; use http::uri::Scheme; use http::Uri; use hyper_util::client::legacy::connect::HttpConnector; @@ -61,6 +58,13 @@ use quinn::VarInt; use tokio::time::Sleep; use tower::util::BoxCloneSyncServiceLayer; use tower::{Layer, Service}; +#[cfg(any( + feature = "gzip", + feature = "brotli", + feature = "zstd", + feature = "deflate" +))] +use tower_http::decompression::Decompression; use tower_http::follow_redirect::FollowRedirect; /// An asynchronous `Client` to make Requests with. @@ -96,6 +100,33 @@ enum HttpVersionPref { All, } +#[derive(Clone, Copy, Debug)] +struct Accepts { + #[cfg(feature = "gzip")] + gzip: bool, + #[cfg(feature = "brotli")] + brotli: bool, + #[cfg(feature = "zstd")] + zstd: bool, + #[cfg(feature = "deflate")] + deflate: bool, +} + +impl Default for Accepts { + fn default() -> Accepts { + Accepts { + #[cfg(feature = "gzip")] + gzip: true, + #[cfg(feature = "brotli")] + brotli: true, + #[cfg(feature = "zstd")] + zstd: true, + #[cfg(feature = "deflate")] + deflate: true, + } + } +} + #[derive(Clone)] struct HyperService { hyper: HyperClient, @@ -978,6 +1009,21 @@ impl ClientBuilder { #[cfg(feature = "cookies")] let svc = CookieService::new(svc, config.cookie_store.clone()); let hyper = FollowRedirect::with_policy(svc, redirect_policy.clone()); + #[cfg(any( + feature = "gzip", + feature = "brotli", + feature = "zstd", + feature = "deflate" + ))] + let hyper = Decompression::new(hyper); + #[cfg(feature = "gzip")] + let hyper = hyper.gzip(config.accepts.gzip); + #[cfg(feature = "brotli")] + let hyper = hyper.br(config.accepts.brotli); + #[cfg(feature = "zstd")] + let hyper = hyper.zstd(config.accepts.zstd); + #[cfg(feature = "deflate")] + let hyper = hyper.deflate(config.accepts.deflate); Ok(Client { inner: Arc::new(ClientRef { @@ -993,7 +1039,23 @@ impl ClientBuilder { let svc = tower::retry::Retry::new(retry_policy, h3_service); #[cfg(feature = "cookies")] let svc = CookieService::new(svc, config.cookie_store); - Some(FollowRedirect::with_policy(svc, redirect_policy)) + let svc = FollowRedirect::with_policy(svc, redirect_policy); + #[cfg(any( + feature = "gzip", + feature = "brotli", + feature = "zstd", + feature = "deflate" + ))] + let svc = Decompression::new(svc); + #[cfg(feature = "gzip")] + let svc = svc.gzip(config.accepts.gzip); + #[cfg(feature = "brotli")] + let svc = svc.br(config.accepts.brotli); + #[cfg(feature = "zstd")] + let svc = svc.zstd(config.accepts.zstd); + #[cfg(feature = "deflate")] + let svc = svc.deflate(config.accepts.deflate); + Some(svc) } None => None, }, @@ -2484,14 +2546,6 @@ impl Client { } } - let accept_encoding = self.inner.accepts.as_str(); - - if let Some(accept_encoding) = accept_encoding { - if !headers.contains_key(ACCEPT_ENCODING) && !headers.contains_key(RANGE) { - headers.insert(ACCEPT_ENCODING, HeaderValue::from_static(accept_encoding)); - } - } - let uri = match try_uri(&url) { Ok(uri) => uri, _ => return Pending::new_err(error::url_invalid_uri(url)), @@ -2776,12 +2830,32 @@ impl Config { } #[cfg(not(feature = "cookies"))] -type LayeredService = - FollowRedirect, TowerRedirectPolicy>; +type MaybeCookieService = T; + #[cfg(feature = "cookies")] -type LayeredService = FollowRedirect< - CookieService>, - TowerRedirectPolicy, +type MaybeCookieService = CookieService; + +#[cfg(not(any( + feature = "gzip", + feature = "brotli", + feature = "zstd", + feature = "deflate" +)))] +type MaybeDecompression = T; + +#[cfg(any( + feature = "gzip", + feature = "brotli", + feature = "zstd", + feature = "deflate" +))] +type MaybeDecompression = Decompression; + +type LayeredService = MaybeDecompression< + FollowRedirect< + MaybeCookieService>, + TowerRedirectPolicy, + >, >; type LayeredFuture = as Service>>::Future; @@ -2947,7 +3021,7 @@ impl Future for PendingRequest { Err(e) => { return Poll::Ready(Err(crate::error::request(e).with_url(self.url.clone()))); } - Ok(res) => res, + Ok(res) => res.map(super::body::boxed), }, }; @@ -2964,7 +3038,6 @@ impl Future for PendingRequest { let res = Response::new( res, self.url.clone(), - self.client.accepts, self.total_timeout.take(), self.read_timeout, ); diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs deleted file mode 100644 index d21962483..000000000 --- a/src/async_impl/decoder.rs +++ /dev/null @@ -1,746 +0,0 @@ -use std::fmt; -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -use std::future::Future; -use std::pin::Pin; -use std::task::{ready, Context, Poll}; - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -use futures_util::stream::Fuse; - -#[cfg(feature = "gzip")] -use async_compression::tokio::bufread::GzipDecoder; - -#[cfg(feature = "brotli")] -use async_compression::tokio::bufread::BrotliDecoder; - -#[cfg(feature = "zstd")] -use async_compression::tokio::bufread::ZstdDecoder; - -#[cfg(feature = "deflate")] -use async_compression::tokio::bufread::ZlibDecoder; - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate", - feature = "blocking", -))] -use futures_core::Stream; - -use bytes::Bytes; -use http::HeaderMap; -use hyper::body::Body as HttpBody; -use hyper::body::Frame; - -#[cfg(any( - feature = "gzip", - feature = "brotli", - feature = "zstd", - feature = "deflate" -))] -use tokio_util::codec::{BytesCodec, FramedRead}; -#[cfg(any( - feature = "gzip", - feature = "brotli", - feature = "zstd", - feature = "deflate" -))] -use tokio_util::io::StreamReader; - -use super::body::ResponseBody; - -#[derive(Clone, Copy, Debug)] -pub(super) struct Accepts { - #[cfg(feature = "gzip")] - pub(super) gzip: bool, - #[cfg(feature = "brotli")] - pub(super) brotli: bool, - #[cfg(feature = "zstd")] - pub(super) zstd: bool, - #[cfg(feature = "deflate")] - pub(super) deflate: bool, -} - -impl Accepts { - pub fn none() -> Self { - Self { - #[cfg(feature = "gzip")] - gzip: false, - #[cfg(feature = "brotli")] - brotli: false, - #[cfg(feature = "zstd")] - zstd: false, - #[cfg(feature = "deflate")] - deflate: false, - } - } -} - -/// A response decompressor over a non-blocking stream of chunks. -/// -/// The inner decoder may be constructed asynchronously. -pub(crate) struct Decoder { - inner: Inner, -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -type PeekableIoStream = futures_util::stream::Peekable; - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -type PeekableIoStreamReader = StreamReader; - -enum Inner { - /// A `PlainText` decoder just returns the response content as is. - PlainText(ResponseBody), - - /// A `Gzip` decoder will uncompress the gzipped response content before returning it. - #[cfg(feature = "gzip")] - Gzip(Pin, BytesCodec>>>>), - - /// A `Brotli` decoder will uncompress the brotlied response content before returning it. - #[cfg(feature = "brotli")] - Brotli(Pin, BytesCodec>>>>), - - /// A `Zstd` decoder will uncompress the zstd compressed response content before returning it. - #[cfg(feature = "zstd")] - Zstd(Pin, BytesCodec>>>>), - - /// A `Deflate` decoder will uncompress the deflated response content before returning it. - #[cfg(feature = "deflate")] - Deflate(Pin, BytesCodec>>>>), - - /// A decoder that doesn't have a value yet. - #[cfg(any( - feature = "brotli", - feature = "zstd", - feature = "gzip", - feature = "deflate" - ))] - Pending(Pin>), -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -/// A future attempt to poll the response body for EOF so we know whether to use gzip or not. -struct Pending(PeekableIoStream, DecoderType); - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate", - feature = "blocking", -))] -pub(crate) struct IoStream(B); - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -enum DecoderType { - #[cfg(feature = "gzip")] - Gzip, - #[cfg(feature = "brotli")] - Brotli, - #[cfg(feature = "zstd")] - Zstd, - #[cfg(feature = "deflate")] - Deflate, -} - -impl fmt::Debug for Decoder { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Decoder").finish() - } -} - -impl Decoder { - #[cfg(feature = "blocking")] - pub(crate) fn empty() -> Decoder { - Decoder { - inner: Inner::PlainText(empty()), - } - } - - #[cfg(feature = "blocking")] - pub(crate) fn into_stream(self) -> IoStream { - IoStream(self) - } - - /// A plain text decoder. - /// - /// This decoder will emit the underlying chunks as-is. - fn plain_text(body: ResponseBody) -> Decoder { - Decoder { - inner: Inner::PlainText(body), - } - } - - /// A gzip decoder. - /// - /// This decoder will buffer and decompress chunks that are gzipped. - #[cfg(feature = "gzip")] - fn gzip(body: ResponseBody) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), - DecoderType::Gzip, - ))), - } - } - - /// A brotli decoder. - /// - /// This decoder will buffer and decompress chunks that are brotlied. - #[cfg(feature = "brotli")] - fn brotli(body: ResponseBody) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), - DecoderType::Brotli, - ))), - } - } - - /// A zstd decoder. - /// - /// This decoder will buffer and decompress chunks that are zstd compressed. - #[cfg(feature = "zstd")] - fn zstd(body: ResponseBody) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), - DecoderType::Zstd, - ))), - } - } - - /// A deflate decoder. - /// - /// This decoder will buffer and decompress chunks that are deflated. - #[cfg(feature = "deflate")] - fn deflate(body: ResponseBody) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), - DecoderType::Deflate, - ))), - } - } - - #[cfg(any( - feature = "brotli", - feature = "zstd", - feature = "gzip", - feature = "deflate" - ))] - fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool { - use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; - use log::warn; - - let mut is_content_encoded = { - headers - .get_all(CONTENT_ENCODING) - .iter() - .any(|enc| enc == encoding_str) - || headers - .get_all(TRANSFER_ENCODING) - .iter() - .any(|enc| enc == encoding_str) - }; - if is_content_encoded { - if let Some(content_length) = headers.get(CONTENT_LENGTH) { - if content_length == "0" { - warn!("{encoding_str} response with content-length of 0"); - is_content_encoded = false; - } - } - } - if is_content_encoded { - headers.remove(CONTENT_ENCODING); - headers.remove(CONTENT_LENGTH); - } - is_content_encoded - } - - /// Constructs a Decoder from a hyper request. - /// - /// A decoder is just a wrapper around the hyper request that knows - /// how to decode the content body of the request. - /// - /// Uses the correct variant by inspecting the Content-Encoding header. - pub(super) fn detect( - _headers: &mut HeaderMap, - body: ResponseBody, - _accepts: Accepts, - ) -> Decoder { - #[cfg(feature = "gzip")] - { - if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") { - return Decoder::gzip(body); - } - } - - #[cfg(feature = "brotli")] - { - if _accepts.brotli && Decoder::detect_encoding(_headers, "br") { - return Decoder::brotli(body); - } - } - - #[cfg(feature = "zstd")] - { - if _accepts.zstd && Decoder::detect_encoding(_headers, "zstd") { - return Decoder::zstd(body); - } - } - - #[cfg(feature = "deflate")] - { - if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") { - return Decoder::deflate(body); - } - } - - Decoder::plain_text(body) - } -} - -impl HttpBody for Decoder { - type Data = Bytes; - type Error = crate::Error; - - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll, Self::Error>>> { - match self.inner { - #[cfg(any( - feature = "brotli", - feature = "zstd", - feature = "gzip", - feature = "deflate" - ))] - Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { - Poll::Ready(Ok(inner)) => { - self.inner = inner; - self.poll_frame(cx) - } - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))), - Poll::Pending => Poll::Pending, - }, - Inner::PlainText(ref mut body) => match ready!(Pin::new(body).poll_frame(cx)) { - Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))), - None => Poll::Ready(None), - }, - #[cfg(feature = "gzip")] - Inner::Gzip(ref mut decoder) => { - match ready!(Pin::new(&mut *decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => { - // poll inner connection until EOF after gzip stream is finished - poll_inner_should_be_empty( - decoder.get_mut().get_mut().get_mut().get_mut(), - cx, - ) - } - } - } - #[cfg(feature = "brotli")] - Inner::Brotli(ref mut decoder) => { - match ready!(Pin::new(&mut *decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => { - // poll inner connection until EOF after brotli stream is finished - poll_inner_should_be_empty( - decoder.get_mut().get_mut().get_mut().get_mut(), - cx, - ) - } - } - } - #[cfg(feature = "zstd")] - Inner::Zstd(ref mut decoder) => { - match ready!(Pin::new(&mut *decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => { - // poll inner connection until EOF after zstd stream is finished - poll_inner_should_be_empty( - decoder.get_mut().get_mut().get_mut().get_mut(), - cx, - ) - } - } - } - #[cfg(feature = "deflate")] - Inner::Deflate(ref mut decoder) => { - match ready!(Pin::new(&mut *decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => { - // poll inner connection until EOF after deflate stream is finished - poll_inner_should_be_empty( - decoder.get_mut().get_mut().get_mut().get_mut(), - cx, - ) - } - } - } - } - } - - fn size_hint(&self) -> http_body::SizeHint { - match self.inner { - Inner::PlainText(ref body) => HttpBody::size_hint(body), - // the rest are "unknown", so default - #[cfg(any( - feature = "brotli", - feature = "zstd", - feature = "gzip", - feature = "deflate" - ))] - _ => http_body::SizeHint::default(), - } - } -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -fn poll_inner_should_be_empty( - inner: &mut PeekableIoStream, - cx: &mut Context, -) -> Poll, crate::Error>>> { - // poll inner connection until EOF after deflate stream is finished - // loop in case of empty frames - let mut inner = Pin::new(inner); - loop { - match ready!(inner.as_mut().poll_next(cx)) { - // ignore any empty frames - Some(Ok(bytes)) if bytes.is_empty() => continue, - Some(Ok(_)) => { - return Poll::Ready(Some(Err(crate::error::decode( - "there are extra bytes after body has been decompressed", - )))) - } - Some(Err(err)) => return Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => return Poll::Ready(None), - } - } -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate", - feature = "blocking", -))] -fn empty() -> ResponseBody { - use http_body_util::{combinators::BoxBody, BodyExt, Empty}; - BoxBody::new(Empty::new().map_err(|never| match never {})) -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate" -))] -impl Future for Pending { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_util::StreamExt; - - match ready!(Pin::new(&mut self.0).poll_peek(cx)) { - Some(Ok(_)) => { - // fallthrough - } - Some(Err(_e)) => { - // error was just a ref, so we need to really poll to move it - return Poll::Ready(Err(ready!(Pin::new(&mut self.0).poll_next(cx)) - .expect("just peeked Some") - .unwrap_err())); - } - None => return Poll::Ready(Ok(Inner::PlainText(empty()))), - }; - - let _body = std::mem::replace(&mut self.0, IoStream(empty()).peekable()); - - match self.1 { - #[cfg(feature = "brotli")] - DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin( - FramedRead::new( - BrotliDecoder::new(StreamReader::new(_body)), - BytesCodec::new(), - ) - .fuse(), - )))), - #[cfg(feature = "zstd")] - DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin( - FramedRead::new( - { - let mut d = ZstdDecoder::new(StreamReader::new(_body)); - d.multiple_members(true); - d - }, - BytesCodec::new(), - ) - .fuse(), - )))), - #[cfg(feature = "gzip")] - DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin( - FramedRead::new( - GzipDecoder::new(StreamReader::new(_body)), - BytesCodec::new(), - ) - .fuse(), - )))), - #[cfg(feature = "deflate")] - DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin( - FramedRead::new( - ZlibDecoder::new(StreamReader::new(_body)), - BytesCodec::new(), - ) - .fuse(), - )))), - } - } -} - -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate", - feature = "blocking", -))] -impl Stream for IoStream -where - B: HttpBody + Unpin, - B::Error: Into>, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - return match ready!(Pin::new(&mut self.0).poll_frame(cx)) { - Some(Ok(frame)) => { - // skip non-data frames - if let Ok(buf) = frame.into_data() { - Poll::Ready(Some(Ok(buf))) - } else { - continue; - } - } - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::into_io(err.into())))), - None => Poll::Ready(None), - }; - } - } -} - -// ===== impl Accepts ===== - -impl Accepts { - /* - pub(super) fn none() -> Self { - Accepts { - #[cfg(feature = "gzip")] - gzip: false, - #[cfg(feature = "brotli")] - brotli: false, - #[cfg(feature = "zstd")] - zstd: false, - #[cfg(feature = "deflate")] - deflate: false, - } - } - */ - - pub(super) const fn as_str(&self) -> Option<&'static str> { - match ( - self.is_gzip(), - self.is_brotli(), - self.is_zstd(), - self.is_deflate(), - ) { - (true, true, true, true) => Some("gzip, br, zstd, deflate"), - (true, true, false, true) => Some("gzip, br, deflate"), - (true, true, true, false) => Some("gzip, br, zstd"), - (true, true, false, false) => Some("gzip, br"), - (true, false, true, true) => Some("gzip, zstd, deflate"), - (true, false, false, true) => Some("gzip, deflate"), - (false, true, true, true) => Some("br, zstd, deflate"), - (false, true, false, true) => Some("br, deflate"), - (true, false, true, false) => Some("gzip, zstd"), - (true, false, false, false) => Some("gzip"), - (false, true, true, false) => Some("br, zstd"), - (false, true, false, false) => Some("br"), - (false, false, true, true) => Some("zstd, deflate"), - (false, false, true, false) => Some("zstd"), - (false, false, false, true) => Some("deflate"), - (false, false, false, false) => None, - } - } - - const fn is_gzip(&self) -> bool { - #[cfg(feature = "gzip")] - { - self.gzip - } - - #[cfg(not(feature = "gzip"))] - { - false - } - } - - const fn is_brotli(&self) -> bool { - #[cfg(feature = "brotli")] - { - self.brotli - } - - #[cfg(not(feature = "brotli"))] - { - false - } - } - - const fn is_zstd(&self) -> bool { - #[cfg(feature = "zstd")] - { - self.zstd - } - - #[cfg(not(feature = "zstd"))] - { - false - } - } - - const fn is_deflate(&self) -> bool { - #[cfg(feature = "deflate")] - { - self.deflate - } - - #[cfg(not(feature = "deflate"))] - { - false - } - } -} - -impl Default for Accepts { - fn default() -> Accepts { - Accepts { - #[cfg(feature = "gzip")] - gzip: true, - #[cfg(feature = "brotli")] - brotli: true, - #[cfg(feature = "zstd")] - zstd: true, - #[cfg(feature = "deflate")] - deflate: true, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn accepts_as_str() { - fn format_accept_encoding(accepts: &Accepts) -> String { - let mut encodings = vec![]; - if accepts.is_gzip() { - encodings.push("gzip"); - } - if accepts.is_brotli() { - encodings.push("br"); - } - if accepts.is_zstd() { - encodings.push("zstd"); - } - if accepts.is_deflate() { - encodings.push("deflate"); - } - encodings.join(", ") - } - - let state = [true, false]; - let mut permutations = Vec::new(); - - #[allow(unused_variables)] - for gzip in state { - for brotli in state { - for zstd in state { - for deflate in state { - permutations.push(Accepts { - #[cfg(feature = "gzip")] - gzip, - #[cfg(feature = "brotli")] - brotli, - #[cfg(feature = "zstd")] - zstd, - #[cfg(feature = "deflate")] - deflate, - }); - } - } - } - } - - for accepts in permutations { - let expected = format_accept_encoding(&accepts); - let got = accepts.as_str().unwrap_or(""); - assert_eq!(got, expected.as_str()); - } - } -} diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs index 5d99ef027..d0de89de5 100644 --- a/src/async_impl/mod.rs +++ b/src/async_impl/mod.rs @@ -4,12 +4,8 @@ pub use self::request::{Request, RequestBuilder}; pub use self::response::Response; pub use self::upgrade::Upgraded; -#[cfg(feature = "blocking")] -pub(crate) use self::decoder::Decoder; - pub mod body; pub mod client; -pub mod decoder; pub mod h3_client; #[cfg(feature = "multipart")] pub mod multipart; diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 4c0d52727..a3b05658e 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -15,7 +15,6 @@ use tokio::time::Sleep; use url::Url; use super::body::Body; -use super::decoder::{Accepts, Decoder}; use crate::async_impl::body::ResponseBody; #[cfg(feature = "cookies")] use crate::cookie; @@ -27,7 +26,7 @@ use mime::Mime; /// A Response to a submitted `Request`. pub struct Response { - pub(super) res: hyper::Response, + pub(super) res: hyper::Response, // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, @@ -37,17 +36,14 @@ impl Response { pub(super) fn new( res: hyper::Response, url: Url, - accepts: Accepts, total_timeout: Option>>, read_timeout: Option, ) -> Response { - let (mut parts, body) = res.into_parts(); - let decoder = Decoder::detect( - &mut parts.headers, + let (parts, body) = res.into_parts(); + let res = hyper::Response::from_parts( + parts, super::body::response(body, total_timeout, read_timeout), - accepts, ); - let res = hyper::Response::from_parts(parts, decoder); Response { res, @@ -297,6 +293,7 @@ impl Response { BodyExt::collect(self.res.into_body()) .await .map(|buf| buf.to_bytes()) + .map_err(crate::error::decode) } /// Stream a chunk of the response body. @@ -321,7 +318,7 @@ impl Response { // loop to ignore unrecognized frames loop { if let Some(res) = self.res.body_mut().frame().await { - let frame = res?; + let frame = res.map_err(crate::error::decode)?; if let Ok(buf) = frame.into_data() { return Ok(Some(buf)); } @@ -357,7 +354,7 @@ impl Response { #[cfg(feature = "stream")] #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn bytes_stream(self) -> impl futures_core::Stream> { - super::body::DataStream(self.res.into_body()) + super::body::DataStream(self.res.into_body().map_err(crate::error::decode)) } // util methods @@ -432,7 +429,7 @@ impl Response { // // This method is just used by the blocking API. #[cfg(feature = "blocking")] - pub(crate) fn body_mut(&mut self) -> &mut Decoder { + pub(crate) fn body_mut(&mut self) -> &mut ResponseBody { self.res.body_mut() } } @@ -462,17 +459,12 @@ impl> From> for Response { let (mut parts, body) = r.into_parts(); let body: crate::async_impl::body::Body = body.into(); - let decoder = Decoder::detect( - &mut parts.headers, - ResponseBody::new(body.map_err(Into::into)), - Accepts::none(), - ); let url = parts .extensions .remove::() .unwrap_or_else(|| ResponseUrl(Url::parse("http://no.url.provided.local").unwrap())); let url = url.0; - let res = hyper::Response::from_parts(parts, decoder); + let res = hyper::Response::from_parts(parts, ResponseBody::new(body.map_err(Into::into))); Response { res, url: Box::new(url), diff --git a/src/blocking/response.rs b/src/blocking/response.rs index 86c81772c..801bba190 100644 --- a/src/blocking/response.rs +++ b/src/blocking/response.rs @@ -6,6 +6,7 @@ use std::pin::Pin; use std::time::Duration; use bytes::Bytes; +use futures_util::TryStreamExt; use http; use hyper::header::HeaderMap; #[cfg(feature = "json")] @@ -413,13 +414,18 @@ impl Response { // private fn body_mut(&mut self) -> Pin<&mut dyn futures_util::io::AsyncRead> { - use futures_util::TryStreamExt; if self.body.is_none() { - let body = mem::replace(self.inner.body_mut(), async_impl::Decoder::empty()); + let body = mem::replace( + self.inner.body_mut(), + async_impl::body::boxed(http_body_util::Empty::new()), + ); - let body = body.into_stream().into_async_read(); - - self.body = Some(Box::pin(body)); + self.body = Some(Box::pin( + async_impl::body::Body::wrap(body) + .into_stream() + .map_err(crate::error::Error::into_io) + .into_async_read(), + )); } self.body.as_mut().expect("body was init").as_mut() } diff --git a/src/error.rs b/src/error.rs index 0a4d5630f..0264602a3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -359,17 +359,6 @@ pub(crate) fn upgrade>(e: E) -> Error { // io::Error helpers -#[cfg(any( - feature = "gzip", - feature = "zstd", - feature = "brotli", - feature = "deflate", - feature = "blocking", -))] -pub(crate) fn into_io(e: BoxError) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) -} - #[allow(unused)] pub(crate) fn decode_io(e: io::Error) -> Error { if e.get_ref().map(|r| r.is::()).unwrap_or(false) { diff --git a/tests/brotli.rs b/tests/brotli.rs index ba116ed92..c6997894d 100644 --- a/tests/brotli.rs +++ b/tests/brotli.rs @@ -301,6 +301,7 @@ async fn test_chunked_fragmented_response_2() { assert!(start.elapsed() >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN); } +// TODO: figure out how apply fix from https://github.com/seanmonstar/reqwest/pull/2484 #[tokio::test] async fn test_chunked_fragmented_response_with_extra_bytes() { const DELAY_BETWEEN_RESPONSE_PARTS: tokio::time::Duration = diff --git a/tests/deflate.rs b/tests/deflate.rs index 147c36307..fb72673d8 100644 --- a/tests/deflate.rs +++ b/tests/deflate.rs @@ -302,6 +302,7 @@ async fn test_chunked_fragmented_response_2() { assert!(start.elapsed() >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN); } +// TODO: figure out how apply fix from https://github.com/seanmonstar/reqwest/pull/2484 #[tokio::test] async fn test_chunked_fragmented_response_with_extra_bytes() { const DELAY_BETWEEN_RESPONSE_PARTS: tokio::time::Duration = diff --git a/tests/gzip.rs b/tests/gzip.rs index 1028ebfad..809e1ef76 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -302,6 +302,7 @@ async fn test_chunked_fragmented_response_2() { assert!(start.elapsed() >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN); } +// TODO: figure out how apply fix from https://github.com/seanmonstar/reqwest/pull/2484 #[tokio::test] async fn test_chunked_fragmented_response_with_extra_bytes() { const DELAY_BETWEEN_RESPONSE_PARTS: Duration = Duration::from_millis(1000); diff --git a/tests/zstd.rs b/tests/zstd.rs index be463094f..bb3c113ec 100644 --- a/tests/zstd.rs +++ b/tests/zstd.rs @@ -512,6 +512,7 @@ async fn test_chunked_fragmented_response_2() { assert!(start.elapsed() >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN); } +// TODO: figure out how apply fix from https://github.com/seanmonstar/reqwest/pull/2484 #[tokio::test] async fn test_chunked_fragmented_response_with_extra_bytes() { const DELAY_BETWEEN_RESPONSE_PARTS: tokio::time::Duration =