diff --git a/futures-util/benches/bilock.rs b/futures-util/benches/bilock.rs new file mode 100644 index 0000000000..02fe0a7cd5 --- /dev/null +++ b/futures-util/benches/bilock.rs @@ -0,0 +1,108 @@ +#![feature(test)] + +#[cfg(feature = "bilock")] +mod bench { + use futures::executor::block_on; + use futures::task::Poll; + use futures_test::task::noop_context; + use futures_util::lock::BiLock; + + use std::mem::drop; + extern crate test; + use test::Bencher; + + #[bench] + fn contended(b: &mut Bencher) { + let mut ctx = noop_context(); + + b.iter(|| { + let (mut x, mut y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut ctx) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_lock(&mut ctx) { + Poll::Pending => (), + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut ctx) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }); + } + + #[bench] + fn lock_unlock(b: &mut Bencher) { + let mut ctx = noop_context(); + + b.iter(|| { + let (mut x, mut y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut ctx) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut ctx) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }) + } + + #[bench] + fn concurrent(b: &mut Bencher) { + use std::thread; + + b.iter(|| { + let (mut x, mut y) = BiLock::new(false); + const ITERATION_COUNT: usize = 1000; + + let a = thread::spawn(move || { + let mut count = 0; + while count < ITERATION_COUNT { + let mut guard = block_on(x.lock()); + if *guard { + *guard = false; + count += 1; + } + x = guard.unlock(); + } + }); + + let b = thread::spawn(move || { + let mut count = 0; + while count < ITERATION_COUNT { + let mut guard = block_on(y.lock()); + if !*guard { + *guard = true; + count += 1; + } + y = guard.unlock(); + } + }); + + a.join().unwrap(); + b.join().unwrap(); + }) + } +} diff --git a/futures-util/benches_disabled/bilock.rs b/futures-util/benches_disabled/bilock.rs deleted file mode 100644 index 48afe3c551..0000000000 --- a/futures-util/benches_disabled/bilock.rs +++ /dev/null @@ -1,126 +0,0 @@ -#![feature(test)] - -#[cfg(feature = "bilock")] -mod bench { -use futures::task::{Context, Waker}; -use futures::executor::LocalPool; -use futures_util::lock::BiLock; -use futures_util::lock::BiLockAcquire; -use futures_util::lock::BiLockAcquired; -use futures_util::task::ArcWake; - -use std::sync::Arc; -use test::Bencher; - -fn notify_noop() -> Waker { - struct Noop; - - impl ArcWake for Noop { - fn wake(_: &Arc) {} - } - - ArcWake::into_waker(Arc::new(Noop)) -} - - -/// Pseudo-stream which simply calls `lock.poll()` on `poll` -struct LockStream { - lock: BiLockAcquire, -} - -impl LockStream { - fn new(lock: BiLock) -> Self { - Self { - lock: lock.lock() - } - } - - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired) { - self.lock = guard.unlock().lock() - } -} - -impl Stream for LockStream { - type Item = BiLockAcquired; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } -} - - -#[bench] -fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); -} - -#[bench] -fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) -} -} diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs index 3f1b9af456..093e4406f6 100644 --- a/futures-util/src/io/split.rs +++ b/futures-util/src/io/split.rs @@ -18,7 +18,7 @@ pub struct WriteHalf { handle: BiLock, } -fn lock_and_then(lock: &BiLock, cx: &mut Context<'_>, f: F) -> Poll> +fn lock_and_then(lock: &mut BiLock, cx: &mut Context<'_>, f: F) -> Poll> where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, { @@ -53,45 +53,45 @@ impl WriteHalf { impl AsyncRead for ReadHalf { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf)) + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read(cx, buf)) } fn poll_read_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs)) + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs)) } } impl AsyncWrite for WriteHalf { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf)) + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write(cx, buf)) } fn poll_write_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs)) + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs)) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx)) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_flush(cx)) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx)) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + lock_and_then(&mut self.handle, cx, |l, cx| l.poll_close(cx)) } } diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 600e16e421..3f98f19a98 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -1,16 +1,19 @@ //! Futures-powered synchronization primitives. -#[cfg(feature = "bilock")] -use futures_core::future::Future; -use futures_core::task::{Context, Poll, Waker}; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::fmt; +#[cfg(feature = "bilock")] +use core::mem; use core::ops::{Deref, DerefMut}; use core::pin::Pin; -use core::sync::atomic::AtomicUsize; -use core::sync::atomic::Ordering::SeqCst; -use alloc::boxed::Box; -use alloc::sync::Arc; +#[cfg(feature = "bilock")] +use core::ptr; +use core::sync::atomic::AtomicU8; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed}; +#[cfg(feature = "bilock")] +use futures_core::future::Future; +use futures_core::task::{Context, Poll, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between /// two possible owners. @@ -33,18 +36,68 @@ use alloc::sync::Arc; /// /// This type is only available when the `bilock` feature of this /// library is activated. -#[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLock { arc: Arc>, + token: u8, + left: bool, +} + +impl fmt::Debug for BiLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BiLock").field("arc", &self.arc).field("token", &self.token).finish() + } } -#[derive(Debug)] struct Inner { - state: AtomicUsize, + token: AtomicU8, + waker: UnsafeCell>, value: Option>, } +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Inner").field("token", &self.token).field("waker", &self.waker).finish() + } +} + +// Lock/Unlock implementation +// +// There are 3 tokens, +const TOKEN_LOCK: u8 = 2; +const TOKEN_WAKE: u8 = 1; +const TOKEN_NULL: u8 = 0; +// TOKEN_LOCK starts in the inner lock's token field, with the other two tokens starting in the two +// bilock halves +// +// Possession of TOKEN_LOCK gives exclusive access to the inner value +// Possession of TOKEN_WAKE gives exclusive access to the waker field +// +// To poll the lock: +// We atomically swap our held token with the token in inner +// if we receive TOKEN_LOCK, the lock was successful, return Poll::Ready +// if we receive TOKEN_NULL, we swap our token again +// if we receive TOKEN_WAKE, we store our waker in the waker field, then swap our token again, +// On the second swap: +// if we receive TOKEN_LOCK, the lock was successful, return Poll::Ready +// If we receive TOKEN_NULL, we must have stored our token after the first swap, return Poll::Pending +// If we receive TOKEN_WAKE, we store our waker in the waker field, then swap our token again, +// On the third swap: +// If we receive TOKEN_LOCK, the lock was successful, return Poll::Ready +// If we receive TOKEN_NULL, we stored our token after the second swap, return Poll::Pending +// +// To unlock the lock: +// We swap our token (which must be TOKEN_LOCK) with the inner token: +// if we receive TOKEN_WAKE: wake the waker (if present) +// if we receive TOKEN_NULL: there is no contention, do nothing +// +// In addition to the protocol described above we implement the following optimizations: +// On lock: +// Store an identifier representing our half of the bilock alongside the waker +// If the existing waker will_wake our waker, do not replace it +// On unlock: +// If the stored waker belongs to our half, do not wake it + unsafe impl Send for Inner {} unsafe impl Sync for Inner {} @@ -62,11 +115,15 @@ impl BiLock { /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { let arc = Arc::new(Inner { - state: AtomicUsize::new(0), + token: AtomicU8::new(TOKEN_LOCK), value: Some(UnsafeCell::new(t)), + waker: UnsafeCell::new(None), }); - (Self { arc: arc.clone() }, Self { arc }) + ( + Self { arc: arc.clone(), token: TOKEN_WAKE, left: true }, + Self { arc, token: TOKEN_NULL, left: false }, + ) } /// Attempt to acquire this lock, returning `Pending` if it can't be @@ -87,49 +144,55 @@ impl BiLock { /// /// This function will panic if called outside the context of a future's /// task. - pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { - let mut waker = None; - loop { - match self.arc.state.swap(1, SeqCst) { - // Woohoo, we grabbed the lock! - 0 => return Poll::Ready(BiLockGuard { bilock: self }), - - // Oops, someone else has locked the lock - 1 => {} - - // A task was previously blocked on this lock, likely our task, - // so we need to update that task. - n => unsafe { - let mut prev = Box::from_raw(n as *mut Waker); - *prev = cx.waker().clone(); - waker = Some(prev); + pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll> { + assert_ne!(self.token, TOKEN_LOCK); + if self.token == TOKEN_WAKE { + if let Ok(token) = + self.arc.token.compare_exchange_weak(TOKEN_LOCK, self.token, AcqRel, Relaxed) + { + self.token = token; + return Poll::Ready(BiLockGuard { bilock: self }); + } + } else { + self.token = self.arc.token.swap(self.token, Acquire); + if self.token == TOKEN_LOCK { + return Poll::Ready(BiLockGuard { bilock: self }); + } + } + // token must be TOKEN_WAKE here + assert_eq!(self.token, TOKEN_WAKE); + { + let our_waker = cx.waker(); + // SAFETY: we own the wake token, so we have exclusive access to this field. + // The mutable reference we create goes out of scope before we give up the + // token + match unsafe { &mut *self.arc.waker.get() } { + waker @ None => *waker = Some((self.left, our_waker.clone())), + Some((left, waker)) => { + if !our_waker.will_wake(waker) { + *left = self.left; + *waker = our_waker.clone() + } } } + } + // Maybe we spin here a few times? - // type ascription for safety's sake! - let me: Box = waker.take().unwrap_or_else(||Box::new(cx.waker().clone())); - let me = Box::into_raw(me) as usize; - - match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { - // The lock is still locked, but we've now parked ourselves, so - // just report that we're scheduled to receive a notification. - Ok(_) => return Poll::Pending, - - // Oops, looks like the lock was unlocked after our swap above - // and before the compare_exchange. Deallocate what we just - // allocated and go through the loop again. - Err(0) => unsafe { - waker = Some(Box::from_raw(me as *mut Waker)); - }, - - // The top of this loop set the previous state to 1, so if we - // failed the CAS above then it's because the previous value was - // *not* zero or one. This indicates that a task was blocked, - // but we're trying to acquire the lock and there's only one - // other reference of the lock, so it should be impossible for - // that task to ever block itself. - Err(n) => panic!("invalid state: {}", n), + for _ in 0..5 { + match self.arc.token.compare_exchange_weak(TOKEN_LOCK, self.token, AcqRel, Relaxed) { + Ok(token) => { + self.token = token; + return Poll::Ready(BiLockGuard { bilock: self }); + } + Err(_) => {} } + std::hint::spin_loop(); + } + self.token = self.arc.token.swap(self.token, AcqRel); + match self.token { + TOKEN_LOCK => Poll::Ready(BiLockGuard { bilock: self }), + TOKEN_NULL => Poll::Pending, + _ => unreachable!(), } } @@ -144,10 +207,8 @@ impl BiLock { /// Note that the returned future will never resolve to an error. #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] - pub fn lock(&self) -> BiLockAcquire<'_, T> { - BiLockAcquire { - bilock: self, - } + pub fn lock(self) -> BiLockAcquire { + BiLockAcquire { bilock: Some(self) } } /// Attempts to put the two "halves" of a `BiLock` back together and @@ -160,7 +221,6 @@ impl BiLock { if Arc::ptr_eq(&self.arc, &other.arc) { drop(other); let inner = Arc::try_unwrap(self.arc) - .ok() .expect("futures: try_unwrap failed in BiLock::reunite"); Ok(unsafe { inner.into_value() }) } else { @@ -168,19 +228,22 @@ impl BiLock { } } - fn unlock(&self) { - match self.arc.state.swap(0, SeqCst) { - // we've locked the lock, shouldn't be possible for us to see an - // unlocked lock. - 0 => panic!("invalid unlocked state"), - - // Ok, no one else tried to get the lock, we're done. - 1 => {} - - // Another task has parked themselves on this lock, let's wake them - // up as its now their turn. - n => unsafe { - Box::from_raw(n as *mut Waker).wake(); + fn unlock(&mut self) { + assert_eq!(self.token, TOKEN_LOCK); + self.token = self.arc.token.swap(self.token, AcqRel); + match self.token { + TOKEN_NULL => {} // lock uncontended + TOKEN_WAKE => { + // SAFETY: we own the wake token, so we have exclusive access to this field. + if let Some((left, wake)) = unsafe { &mut *self.arc.waker.get() } { + if self.left != *left { + // don't wake our own waker + wake.wake_by_ref() + } + } + } + _ => { + unreachable!() } } } @@ -194,7 +257,7 @@ impl Inner { impl Drop for Inner { fn drop(&mut self) { - assert_eq!(self.state.load(SeqCst), 0); + assert_eq!(*self.token.get_mut(), TOKEN_LOCK); } } @@ -205,9 +268,7 @@ pub struct ReuniteError(pub BiLock, pub BiLock); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } @@ -228,7 +289,7 @@ impl std::error::Error for ReuniteError {} #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLockGuard<'a, T> { - bilock: &'a BiLock, + bilock: &'a mut BiLock, } impl Deref for BiLockGuard<'_, T> { @@ -259,25 +320,83 @@ impl Drop for BiLockGuard<'_, T> { } } +/// Resolved value of the `BiLockAcquire` future. +/// +/// This value, like `BiLockGuard`, is a sentinel to the value `T` through +/// implementations of `Deref` and `DerefMut`. When dropped will unlock the +/// lock, and the original unlocked `BiLock` can be recovered through the +/// `unlock` method. +#[derive(Debug)] +#[cfg(feature = "bilock")] +pub struct BiLockAcquired { + bilock: BiLock, +} + +#[cfg(feature = "bilock")] +impl Deref for BiLockAcquired { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } + } +} + +#[cfg(feature = "bilock")] +impl DerefMut for BiLockAcquired { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } + } +} + +#[cfg(feature = "bilock")] +impl BiLockAcquired { + /// Get a mutable pinned reference to the locked value. + pub fn as_pin_mut(&mut self) -> Pin<&mut T> { + // Safety: we never allow moving a !Unpin value out of a bilock, nor + // allow mutable access to it + unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } + } + /// Recovers the original `BiLock`, unlocking this lock. + pub fn unlock(self) -> BiLock { + let mut bilock = unsafe { ptr::read(&self.bilock) }; // get the lock out without running our destructor + mem::forget(self); + mem::drop(BiLockGuard { bilock: &mut bilock }); // unlock the lock + bilock + } +} + +#[cfg(feature = "bilock")] +impl Drop for BiLockAcquired { + fn drop(&mut self) { + self.bilock.unlock(); + } +} + /// Future returned by `BiLock::lock` which will resolve when the lock is /// acquired. #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] -pub struct BiLockAcquire<'a, T> { - bilock: &'a BiLock, +pub struct BiLockAcquire { + bilock: Option>, } // Pinning is never projected to fields #[cfg(feature = "bilock")] -impl Unpin for BiLockAcquire<'_, T> {} +impl Unpin for BiLockAcquire {} #[cfg(feature = "bilock")] -impl<'a, T> Future for BiLockAcquire<'a, T> { - type Output = BiLockGuard<'a, T>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.bilock.poll_lock(cx) +impl Future for BiLockAcquire { + type Output = BiLockAcquired; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let bilock = self.bilock.as_mut().expect("Cannot poll after Ready"); + match bilock.poll_lock(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(guard) => { + mem::forget(guard); // don't run the destructor, so the lock stays locked by us + } + } + Poll::Ready(BiLockAcquired { bilock: self.bilock.take().unwrap() }) } } diff --git a/futures-util/src/lock/mod.rs b/futures-util/src/lock/mod.rs index 071eef6f62..88302e9e24 100644 --- a/futures-util/src/lock/mod.rs +++ b/futures-util/src/lock/mod.rs @@ -15,7 +15,7 @@ cfg_target_has_atomic! { mod bilock; #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] - pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; + pub use self::bilock::{BiLock, BiLockAcquire, BiLockAcquired, BiLockGuard, ReuniteError}; #[cfg(any(feature = "sink", feature = "io"))] #[cfg(not(feature = "bilock"))] pub(crate) use self::bilock::BiLock; diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 997b9747a2..45fe350e49 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -29,7 +29,7 @@ impl SplitStream { impl Stream for SplitStream { type Item = S::Item; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx) } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 935cf6a50f..f26e7e8fd6 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -887,15 +887,15 @@ pub mod lock { assert_impl!(BiLock: Unpin); #[cfg(feature = "bilock")] - assert_impl!(BiLockAcquire<'_, ()>: Send); + assert_impl!(BiLockAcquire<()>: Send); #[cfg(feature = "bilock")] - assert_not_impl!(BiLockAcquire<'_, *const ()>: Send); + assert_not_impl!(BiLockAcquire<*const ()>: Send); #[cfg(feature = "bilock")] - assert_impl!(BiLockAcquire<'_, ()>: Sync); + assert_impl!(BiLockAcquire<()>: Sync); #[cfg(feature = "bilock")] - assert_not_impl!(BiLockAcquire<'_, *const ()>: Sync); + assert_not_impl!(BiLockAcquire<*const ()>: Sync); #[cfg(feature = "bilock")] - assert_impl!(BiLockAcquire<'_, PhantomPinned>: Unpin); + assert_impl!(BiLockAcquire: Unpin); #[cfg(feature = "bilock")] assert_impl!(BiLockGuard<'_, ()>: Send); diff --git a/futures/tests/bilock.rs b/futures/tests/bilock.rs new file mode 100644 index 0000000000..040815e228 --- /dev/null +++ b/futures/tests/bilock.rs @@ -0,0 +1,145 @@ +#![cfg(feature = "bilock")] +use futures::executor::block_on; +use futures::future::{self, Future, FutureExt}; +use futures::lock::BiLock; +use futures::stream::{self, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::panic_context; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; + +#[test] +fn smoke() { + let future = future::lazy(|ctx| { + let (mut a, mut b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock(ctx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock(ctx).is_pending()); + //assert!(a.poll_lock(ctx).is_pending()); + } + + assert!(b.poll_lock(ctx).is_ready()); + assert!(a.poll_lock(ctx).is_ready()); + + { + let lock = match b.poll_lock(ctx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + block_on(future).expect("failure in poll") +} + +#[test] +fn concurrent() { + const N: usize = 10000; + let (a, b) = BiLock::new(0); + + let a = Increment { a: Some(a), remaining: N }; + let b = stream::iter(0..N).fold(b, |b, _n| { + b.lock().map(|mut b| { + *b += 1; + b.unlock() + }) + }); + + let mut ctx = panic_context(); + + let t1 = thread::spawn(move || block_on(a)); + let mut b = block_on(b); + let mut a = t1.join().expect("a error"); + + match a.poll_lock(&mut ctx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + match b.poll_lock(&mut ctx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option>, + } + + impl Future for Increment { + type Output = BiLock; + + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + loop { + if self.remaining == 0 { + return Poll::Ready(self.a.take().unwrap()); + } + + { + let a = self.a.as_mut().unwrap(); + let mut a = match a.poll_lock(ctx) { + Poll::Ready(l) => l, + Poll::Pending => return Poll::Pending, + }; + *a += 1; + } + self.remaining -= 1; + } + } + } +} + +#[test] +#[ignore = "long runtime"] +fn exclusion() { + const N: usize = 1000000; + let (mut a, mut b) = BiLock::new(AtomicUsize::new(0)); + let t1 = thread::spawn(move || { + for _ in 0..N { + let guard = block_on(a.lock()); + let start = guard.load(Ordering::SeqCst); + let mut inc = 0; + let mut end; + for _ in 0..100 { + end = guard.fetch_add(1, Ordering::SeqCst) + 1; + inc += 1; + assert_eq!(start + inc, end); + } + a = guard.unlock(); + } + a + }); + let t2 = thread::spawn(move || { + for _ in 0..N { + let guard = block_on(b.lock()); + let start = guard.load(Ordering::SeqCst); + let mut inc = 0; + let mut end; + for _ in 0..100 { + end = guard.fetch_add(1, Ordering::SeqCst) + 1; + inc += 1; + assert_eq!(start + inc, end); + } + b = guard.unlock(); + } + b + }); + let a = t1.join().unwrap(); + let b = t2.join().unwrap(); + let inner = a.reunite(b).unwrap().into_inner(); + assert_eq!(inner, 2 * N * 100); +} diff --git a/futures/tests_disabled/bilock.rs b/futures/tests_disabled/bilock.rs deleted file mode 100644 index c1bc33f507..0000000000 --- a/futures/tests_disabled/bilock.rs +++ /dev/null @@ -1,105 +0,0 @@ -use futures::task; -use futures::stream; -use futures::future; -use futures_util::lock::BiLock; -use std::thread; - -mod support; -use support::*; - -#[test] -fn smoke() { - let future = future::lazy(|_| { - let (a, b) = BiLock::new(1); - - { - let mut lock = match a.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => panic!("poll not ready"), - }; - assert_eq!(*lock, 1); - *lock = 2; - - assert!(b.poll_lock().is_pending()); - assert!(a.poll_lock().is_pending()); - } - - assert!(b.poll_lock().is_ready()); - assert!(a.poll_lock().is_ready()); - - { - let lock = match b.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => panic!("poll not ready"), - }; - assert_eq!(*lock, 2); - } - - assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); - - Ok::<(), ()>(()) - }); - - assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); -} - -#[test] -fn concurrent() { - const N: usize = 10000; - let (a, b) = BiLock::new(0); - - let a = Increment { - a: Some(a), - remaining: N, - }; - let b = stream::iter_ok(0..N).fold(b, |b, _n| { - b.lock().map(|mut b| { - *b += 1; - b.unlock() - }) - }); - - let t1 = thread::spawn(move || a.wait()); - let b = b.wait().expect("b error"); - let a = t1.join().unwrap().expect("a error"); - - match a.poll_lock() { - Poll::Ready(l) => assert_eq!(*l, 2 * N), - Poll::Pending => panic!("poll not ready"), - } - match b.poll_lock() { - Poll::Ready(l) => assert_eq!(*l, 2 * N), - Poll::Pending => panic!("poll not ready"), - } - - assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); - - struct Increment { - remaining: usize, - a: Option>, - } - - impl Future for Increment { - type Item = BiLock; - type Error = (); - - fn poll(&mut self) -> Poll, ()> { - loop { - if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()) - } - - let a = self.a.as_ref().unwrap(); - let mut a = match a.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => return Ok(Poll::Pending), - }; - self.remaining -= 1; - *a += 1; - } - } - } -}