Skip to content

refactor: reduce dependency on futures-util #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/client/legacy/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{self, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -968,14 +968,11 @@ impl ConnectingTcp<'_> {
match self.fallback {
None => self.preferred.connect(self.config).await,
Some(mut fallback) => {
let preferred_fut = self.preferred.connect(self.config);
futures_util::pin_mut!(preferred_fut);
let preferred_fut = pin!(self.preferred.connect(self.config));

let fallback_fut = fallback.remote.connect(self.config);
futures_util::pin_mut!(fallback_fut);
let fallback_fut = pin!(fallback.remote.connect(self.config));

let fallback_delay = fallback.delay;
futures_util::pin_mut!(fallback_delay);
let fallback_delay = pin!(fallback.delay);

let (result, future) =
match futures_util::future::select(preferred_fut, fallback_delay).await {
Expand Down
3 changes: 1 addition & 2 deletions src/client/legacy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use std::hash::Hash;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use std::task::{self, ready, Poll};

use std::time::{Duration, Instant};

use futures_channel::oneshot;
use futures_core::ready;
use tracing::{debug, trace};

use hyper::rt::Sleep;
Expand Down
3 changes: 1 addition & 2 deletions src/server/conn/auto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::future::Future;
use std::marker::PhantomPinned;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::{error::Error as StdError, io, time::Duration};

use bytes::Bytes;
use futures_core::ready;
use http::{Request, Response};
use http_body::Body;
use hyper::{
Expand Down
3 changes: 1 addition & 2 deletions src/service/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use futures_core::ready;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use tower_service::Service;

// Vendored from tower::util to reduce dependencies, the code is small enough.
Expand Down
20 changes: 7 additions & 13 deletions tests/legacy_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod test_utils;

use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -143,8 +143,7 @@ async fn drop_client_closes_idle_connections() {
drop(client);

// and wait a few ticks for the connections to close
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
t1.await.unwrap();
Expand Down Expand Up @@ -193,8 +192,7 @@ async fn drop_response_future_closes_in_progress_connection() {
future::select(res, rx1).await;

// res now dropped
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
}
Expand Down Expand Up @@ -249,8 +247,7 @@ async fn drop_response_body_closes_in_progress_connection() {
res.unwrap();

// and wait a few ticks to see the connection drop
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
}
Expand Down Expand Up @@ -302,8 +299,7 @@ async fn no_keep_alive_closes_connection() {
let (res, _) = future::join(res, rx).await;
res.unwrap();

let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(close, t).await;
}
Expand Down Expand Up @@ -349,8 +345,7 @@ async fn socket_disconnect_closes_idle_conn() {
let (res, _) = future::join(res, rx).await;
res.unwrap();

let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
}
Expand Down Expand Up @@ -572,8 +567,7 @@ async fn client_keep_alive_when_response_before_request_body_ends() {
assert_eq!(connects.load(Ordering::Relaxed), 1);

drop(client);
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")));
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_channel::mpsc;
use futures_util::task::{Context, Poll};
use futures_util::Future;
use futures_util::TryFutureExt;
use hyper::Uri;
use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};
Expand Down
Loading