Skip to content

Commit 2f95afa

Browse files
committed
refactor: use decompression from http-tower
1 parent b126ca4 commit 2f95afa

File tree

12 files changed

+124
-811
lines changed

12 files changed

+124
-811
lines changed

Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ charset = ["dep:encoding_rs", "dep:mime"]
6464

6565
cookies = ["dep:cookie_crate", "dep:cookie_store"]
6666

67-
gzip = ["dep:async-compression", "async-compression?/gzip", "dep:futures-util", "dep:tokio-util"]
67+
gzip = ["tower-http/decompression-gzip"]
6868

69-
brotli = ["dep:async-compression", "async-compression?/brotli", "dep:futures-util", "dep:tokio-util"]
69+
brotli = ["tower-http/decompression-br"]
7070

71-
zstd = ["dep:async-compression", "async-compression?/zstd", "dep:futures-util", "dep:tokio-util"]
71+
zstd = ["tower-http/decompression-zstd"]
7272

73-
deflate = ["dep:async-compression", "async-compression?/zlib", "dep:futures-util", "dep:tokio-util"]
73+
deflate = ["tower-http/decompression-deflate"]
7474

7575
json = ["dep:serde_json"]
7676

@@ -159,7 +159,6 @@ cookie_crate = { version = "0.18.0", package = "cookie", optional = true }
159159
cookie_store = { version = "0.21.0", optional = true }
160160

161161
## compression
162-
async-compression = { version = "0.4.0", default-features = false, features = ["tokio"], optional = true }
163162
tokio-util = { version = "0.7.9", default-features = false, features = ["codec", "io"], optional = true }
164163

165164
## hickory-dns

src/async_impl/body.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pin_project! {
4747
}
4848

4949
/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
50-
#[cfg(any(feature = "stream", feature = "multipart",))]
50+
#[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
5151
pub(crate) struct DataStream<B>(pub(crate) B);
5252

5353
impl Body {
@@ -161,7 +161,7 @@ impl Body {
161161
}
162162
}
163163

164-
#[cfg(feature = "multipart")]
164+
#[cfg(any(feature = "multipart", feature = "blocking"))]
165165
pub(crate) fn into_stream(self) -> DataStream<Body> {
166166
DataStream(self)
167167
}
@@ -423,7 +423,7 @@ where
423423

424424
// ===== impl DataStream =====
425425

426-
#[cfg(any(feature = "stream", feature = "multipart",))]
426+
#[cfg(any(feature = "stream", feature = "multipart", feature = "blocking",))]
427427
impl<B> futures_core::Stream for DataStream<B>
428428
where
429429
B: HttpBody<Data = Bytes> + Unpin,

src/async_impl/client.rs

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::time::Duration;
99
use std::{collections::HashMap, convert::TryInto, net::SocketAddr};
1010
use std::{fmt, str};
1111

12-
use super::decoder::Accepts;
1312
use super::request::{Request, RequestBuilder};
1413
use super::response::Response;
1514
use super::Body;
@@ -45,9 +44,7 @@ use crate::Certificate;
4544
use crate::Identity;
4645
use crate::{IntoUrl, Method, Proxy, Url};
4746

48-
use http::header::{
49-
Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, PROXY_AUTHORIZATION, RANGE, USER_AGENT,
50-
};
47+
use http::header::{Entry, HeaderMap, HeaderValue, ACCEPT, PROXY_AUTHORIZATION, USER_AGENT};
5148
use http::uri::Scheme;
5249
use http::Uri;
5350
use hyper_util::client::legacy::connect::HttpConnector;
@@ -61,6 +58,13 @@ use quinn::VarInt;
6158
use tokio::time::Sleep;
6259
use tower::util::BoxCloneSyncServiceLayer;
6360
use tower::{Layer, Service};
61+
#[cfg(any(
62+
feature = "gzip",
63+
feature = "brotli",
64+
feature = "zstd",
65+
feature = "deflate"
66+
))]
67+
use tower_http::decompression::Decompression;
6468
use tower_http::follow_redirect::FollowRedirect;
6569

6670
/// An asynchronous `Client` to make Requests with.
@@ -96,6 +100,33 @@ enum HttpVersionPref {
96100
All,
97101
}
98102

103+
#[derive(Clone, Copy, Debug)]
104+
struct Accepts {
105+
#[cfg(feature = "gzip")]
106+
gzip: bool,
107+
#[cfg(feature = "brotli")]
108+
brotli: bool,
109+
#[cfg(feature = "zstd")]
110+
zstd: bool,
111+
#[cfg(feature = "deflate")]
112+
deflate: bool,
113+
}
114+
115+
impl Default for Accepts {
116+
fn default() -> Accepts {
117+
Accepts {
118+
#[cfg(feature = "gzip")]
119+
gzip: true,
120+
#[cfg(feature = "brotli")]
121+
brotli: true,
122+
#[cfg(feature = "zstd")]
123+
zstd: true,
124+
#[cfg(feature = "deflate")]
125+
deflate: true,
126+
}
127+
}
128+
}
129+
99130
#[derive(Clone)]
100131
struct HyperService {
101132
hyper: HyperClient,
@@ -978,6 +1009,21 @@ impl ClientBuilder {
9781009
#[cfg(feature = "cookies")]
9791010
let svc = CookieService::new(svc, config.cookie_store.clone());
9801011
let hyper = FollowRedirect::with_policy(svc, redirect_policy.clone());
1012+
#[cfg(any(
1013+
feature = "gzip",
1014+
feature = "brotli",
1015+
feature = "zstd",
1016+
feature = "deflate"
1017+
))]
1018+
let hyper = Decompression::new(hyper);
1019+
#[cfg(feature = "gzip")]
1020+
let hyper = hyper.gzip(config.accepts.gzip);
1021+
#[cfg(feature = "brotli")]
1022+
let hyper = hyper.br(config.accepts.brotli);
1023+
#[cfg(feature = "zstd")]
1024+
let hyper = hyper.zstd(config.accepts.zstd);
1025+
#[cfg(feature = "deflate")]
1026+
let hyper = hyper.deflate(config.accepts.deflate);
9811027

9821028
Ok(Client {
9831029
inner: Arc::new(ClientRef {
@@ -993,7 +1039,23 @@ impl ClientBuilder {
9931039
let svc = tower::retry::Retry::new(retry_policy, h3_service);
9941040
#[cfg(feature = "cookies")]
9951041
let svc = CookieService::new(svc, config.cookie_store);
996-
Some(FollowRedirect::with_policy(svc, redirect_policy))
1042+
let svc = FollowRedirect::with_policy(svc, redirect_policy);
1043+
#[cfg(any(
1044+
feature = "gzip",
1045+
feature = "brotli",
1046+
feature = "zstd",
1047+
feature = "deflate"
1048+
))]
1049+
let svc = Decompression::new(svc);
1050+
#[cfg(feature = "gzip")]
1051+
let svc = svc.gzip(config.accepts.gzip);
1052+
#[cfg(feature = "brotli")]
1053+
let svc = svc.br(config.accepts.brotli);
1054+
#[cfg(feature = "zstd")]
1055+
let svc = svc.zstd(config.accepts.zstd);
1056+
#[cfg(feature = "deflate")]
1057+
let svc = svc.deflate(config.accepts.deflate);
1058+
Some(svc)
9971059
}
9981060
None => None,
9991061
},
@@ -2484,14 +2546,6 @@ impl Client {
24842546
}
24852547
}
24862548

2487-
let accept_encoding = self.inner.accepts.as_str();
2488-
2489-
if let Some(accept_encoding) = accept_encoding {
2490-
if !headers.contains_key(ACCEPT_ENCODING) && !headers.contains_key(RANGE) {
2491-
headers.insert(ACCEPT_ENCODING, HeaderValue::from_static(accept_encoding));
2492-
}
2493-
}
2494-
24952549
let uri = match try_uri(&url) {
24962550
Ok(uri) => uri,
24972551
_ => return Pending::new_err(error::url_invalid_uri(url)),
@@ -2776,12 +2830,32 @@ impl Config {
27762830
}
27772831

27782832
#[cfg(not(feature = "cookies"))]
2779-
type LayeredService<T> =
2780-
FollowRedirect<tower::retry::Retry<crate::retry::Policy, T>, TowerRedirectPolicy>;
2833+
type MaybeCookieService<T> = T;
2834+
27812835
#[cfg(feature = "cookies")]
2782-
type LayeredService<T> = FollowRedirect<
2783-
CookieService<tower::retry::Retry<crate::retry::Policy, T>>,
2784-
TowerRedirectPolicy,
2836+
type MaybeCookieService<T> = CookieService<T>;
2837+
2838+
#[cfg(not(any(
2839+
feature = "gzip",
2840+
feature = "brotli",
2841+
feature = "zstd",
2842+
feature = "deflate"
2843+
)))]
2844+
type MaybeDecompression<T> = T;
2845+
2846+
#[cfg(any(
2847+
feature = "gzip",
2848+
feature = "brotli",
2849+
feature = "zstd",
2850+
feature = "deflate"
2851+
))]
2852+
type MaybeDecompression<T> = Decompression<T>;
2853+
2854+
type LayeredService<T> = MaybeDecompression<
2855+
FollowRedirect<
2856+
MaybeCookieService<tower::retry::Retry<crate::retry::Policy, T>>,
2857+
TowerRedirectPolicy,
2858+
>,
27852859
>;
27862860
type LayeredFuture<T> = <LayeredService<T> as Service<http::Request<Body>>>::Future;
27872861

@@ -2947,7 +3021,7 @@ impl Future for PendingRequest {
29473021
Err(e) => {
29483022
return Poll::Ready(Err(crate::error::request(e).with_url(self.url.clone())));
29493023
}
2950-
Ok(res) => res,
3024+
Ok(res) => res.map(super::body::boxed),
29513025
},
29523026
};
29533027

@@ -2964,7 +3038,6 @@ impl Future for PendingRequest {
29643038
let res = Response::new(
29653039
res,
29663040
self.url.clone(),
2967-
self.client.accepts,
29683041
self.total_timeout.take(),
29693042
self.read_timeout,
29703043
);

0 commit comments

Comments
 (0)