Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/client/legacy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ struct IdlePopper<'a, T, K> {
}

impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
while let Some(entry) = self.list.pop() {
// If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking...
Expand All @@ -312,7 +312,7 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
//
// In that case, we could just break out of the loop and drop the
// whole list...
if expiration.expires(entry.idle_at) {
if expiration.expires(entry.idle_at, now) {
trace!("removing expired connection for {:?}", self.key);
continue;
}
Expand All @@ -321,7 +321,7 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
#[cfg(feature = "http2")]
Reservation::Shared(to_reinsert, to_checkout) => {
self.list.push(Idle {
idle_at: Instant::now(),
idle_at: now,
value: to_reinsert,
});
to_checkout
Expand All @@ -340,6 +340,12 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
}

impl<T: Poolable, K: Key> PoolInner<T, K> {
fn now(&self) -> Instant {
self.timer
.as_ref()
.map_or_else(|| Instant::now(), |t| t.now())
}

fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
if value.can_share() && self.idle.contains_key(&key) {
trace!("put; existing idle HTTP/2 connection for {:?}", key);
Expand Down Expand Up @@ -386,6 +392,7 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
Some(value) => {
// borrow-check scope...
{
let now = self.now();
let idle_list = self.idle.entry(key.clone()).or_default();
if self.max_idle_per_host <= idle_list.len() {
trace!("max idle per host for {:?}, dropping connection", key);
Expand All @@ -395,7 +402,7 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
debug!("pooling idle connection for {:?}", key);
idle_list.push(Idle {
value,
idle_at: Instant::now(),
idle_at: now,
});
}

Expand Down Expand Up @@ -477,7 +484,7 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
fn clear_expired(&mut self) {
let dur = self.timeout.expect("interval assumes timeout");

let now = Instant::now();
let now = self.now();
//self.last_idle_check_at = now;

self.idle.retain(|key, values| {
Expand Down Expand Up @@ -649,6 +656,7 @@ impl<T: Poolable, K: Key> Checkout<T, K> {
let entry = {
let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let now = inner.now();
let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
// A block to end the mutable borrow on list,
Expand All @@ -658,7 +666,7 @@ impl<T: Poolable, K: Key> Checkout<T, K> {
key: &self.key,
list,
};
popper.pop(&expiration)
popper.pop(&expiration, now)
}
.map(|e| (e, list.is_empty()))
});
Expand Down Expand Up @@ -762,10 +770,10 @@ impl Expiration {
Expiration(dur)
}

fn expires(&self, instant: Instant) -> bool {
fn expires(&self, instant: Instant, now: Instant) -> bool {
match self.0 {
// Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
Some(timeout) => now.saturating_duration_since(instant) > timeout,
None => false,
}
}
Expand All @@ -785,7 +793,7 @@ impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
async fn run(self) {
use futures_util::future;

let mut sleep = self.timer.sleep_until(Instant::now() + self.duration);
let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
let mut on_pool_drop = self.pool_drop_notifier;
loop {
match future::select(&mut on_pool_drop, &mut sleep).await {
Expand All @@ -801,7 +809,7 @@ impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
}
}

let deadline = Instant::now() + self.duration;
let deadline = self.timer.now() + self.duration;
self.timer.reset(&mut sleep, deadline);
}
}
Expand Down Expand Up @@ -976,7 +984,16 @@ mod tests {
}

#[tokio::test]
async fn test_pool_timer_removes_expired() {
async fn test_pool_timer_removes_expired_realtime() {
test_pool_timer_removes_expired_inner().await
}

#[tokio::test(start_paused = true)]
async fn test_pool_timer_removes_expired_faketime() {
test_pool_timer_removes_expired_inner().await
}

async fn test_pool_timer_removes_expired_inner() {
let pool = Pool::new(
super::Config {
idle_timeout: Some(Duration::from_millis(10)),
Expand Down
4 changes: 4 additions & 0 deletions src/common/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ impl hyper::rt::Timer for Timer {
fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
self.0.sleep_until(deadline)
}

fn now(&self) -> Instant {
self.0.now()
}
}
4 changes: 4 additions & 0 deletions src/rt/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ impl Timer for TokioTimer {
sleep.reset(new_deadline)
}
}

fn now(&self) -> Instant {
tokio::time::Instant::now().into()
}
}

impl TokioTimer {
Expand Down