From fef2acf0344f45d4d3212b2832ec034da20c4427 Mon Sep 17 00:00:00 2001 From: Restioson Date: Fri, 11 Sep 2020 17:09:10 +0200 Subject: [PATCH 1/5] Loom awareness --- Cargo.toml | 1 + src/bounded.rs | 6 +++--- src/lib.rs | 9 ++++++++- src/single.rs | 6 +++--- src/unbounded.rs | 6 +++--- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e9e246f..dd8666b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ categories = ["concurrency"] readme = "README.md" [dependencies] +loom = { version = "0.3", optional = true } cache-padded = "1.1.1" [dev-dependencies] diff --git a/src/bounded.rs b/src/bounded.rs index 7a7fb71..4e0faf3 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -1,7 +1,7 @@ -use std::cell::UnsafeCell; +use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; +use crate::facade::sync::atomic::{AtomicUsize, Ordering}; +use crate::facade::thread; use cache_padded::CachePadded; diff --git a/src/lib.rs b/src/lib.rs index 7feff36..eb3cc84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,12 +33,19 @@ use std::error; use std::fmt; use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::sync::atomic::{self, AtomicUsize, Ordering}; +use facade::sync::atomic::{self, AtomicUsize, Ordering}; use crate::bounded::Bounded; use crate::single::Single; use crate::unbounded::Unbounded; +mod facade { + #[cfg(feature = "loom")] + pub use loom::{sync, cell, thread}; + #[cfg(not(feature = "loom"))] + pub use std::{sync, cell, thread}; +} + mod bounded; mod single; mod unbounded; diff --git a/src/single.rs b/src/single.rs index c6c007d..2ae75fc 100644 --- a/src/single.rs +++ b/src/single.rs @@ -1,7 +1,7 @@ -use std::cell::UnsafeCell; +use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; +use crate::facade::sync::atomic::{AtomicUsize, Ordering}; +use crate::facade::thread; use crate::{PopError, PushError}; diff --git a/src/unbounded.rs b/src/unbounded.rs index 63270a1..b7cfeb7 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -1,8 +1,8 @@ -use std::cell::UnsafeCell; +use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ptr; -use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use std::thread; +use crate::facade::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::facade::thread; use cache_padded::CachePadded; From 76f637a882b1cc2f5c89291f8d957bff35430b3f Mon Sep 17 00:00:00 2001 From: Restioson Date: Fri, 11 Sep 2020 17:11:24 +0200 Subject: [PATCH 2/5] More IDE friendly facade --- src/lib.rs | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index eb3cc84..b0c6069 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,11 +39,36 @@ use crate::bounded::Bounded; use crate::single::Single; use crate::unbounded::Unbounded; +/// An internal facade abstracting over loom and std types mod facade { - #[cfg(feature = "loom")] - pub use loom::{sync, cell, thread}; - #[cfg(not(feature = "loom"))] - pub use std::{sync, cell, thread}; + pub mod sync { + #[cfg(feature = "loom")] + pub use loom::sync::*; + #[cfg(not(feature = "loom"))] + pub use std::sync::*; + + pub mod atomic { + #[cfg(feature = "loom")] + pub use loom::sync::atomic::*; + + #[cfg(not(feature = "loom"))] + pub use std::sync::atomic::*; + } + } + + pub mod cell { + #[cfg(feature = "loom")] + pub use loom::cell::*; + #[cfg(not(feature = "loom"))] + pub use std::cell::*; + } + + pub mod thread { + #[cfg(feature = "loom")] + pub use loom::thread::*; + #[cfg(not(feature = "loom"))] + pub use std::thread::*; + } } mod bounded; From aaf12374a1d17802d6550a7eae6e76db3d04d146 Mon Sep 17 00:00:00 2001 From: Restioson Date: Fri, 11 Sep 2020 18:01:04 +0200 Subject: [PATCH 3/5] Fix --- src/bounded.rs | 10 ++++------ src/lib.rs | 28 ++++++++++++++++++++++++++++ src/single.rs | 11 ++++++----- src/unbounded.rs | 34 +++++++++++++++++++++++++++++----- 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/src/bounded.rs b/src/bounded.rs index 4e0faf3..57cc385 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -1,7 +1,7 @@ use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; use crate::facade::sync::atomic::{AtomicUsize, Ordering}; -use crate::facade::thread; +use crate::facade::{thread, cell}; use cache_padded::CachePadded; @@ -118,9 +118,7 @@ impl Bounded { ) { Ok(_) => { // Write the value into the slot and update the stamp. - unsafe { - slot.value.get().write(MaybeUninit::new(value)); - } + unsafe { cell::write(&slot.value, MaybeUninit::new(value)); } slot.stamp.store(tail + 1, Ordering::Release); return Ok(()); } @@ -181,7 +179,7 @@ impl Bounded { ) { Ok(_) => { // Read the value from the slot and update the stamp. - let value = unsafe { slot.value.get().read().assume_init() }; + let value = unsafe { cell::read(&slot.value).assume_init() }; slot.stamp .store(head.wrapping_add(self.one_lap), Ordering::Release); return Ok(value); @@ -298,7 +296,7 @@ impl Drop for Bounded { // Drop the value in the slot. let slot = &self.buffer[index]; unsafe { - let value = slot.value.get().read().assume_init(); + let value = cell::read(&slot.value).assume_init(); drop(value); } } diff --git a/src/lib.rs b/src/lib.rs index b0c6069..8b46019 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,16 @@ mod facade { #[cfg(not(feature = "loom"))] pub use std::sync::atomic::*; + + pub fn load_unique(atomic: &mut AtomicUsize) -> usize { + #[cfg(feature = "loom")] + let v = unsafe { atomic.unsync_load() }; // SAFETY: we have &mut + + #[cfg(not(feature = "loom"))] + let v = *atomic.get_mut(); + + v + } } } @@ -61,6 +71,24 @@ mod facade { pub use loom::cell::*; #[cfg(not(feature = "loom"))] pub use std::cell::*; + + pub unsafe fn write(cell: &UnsafeCell, value: T) { + #[cfg(feature = "loom")] + cell.with_mut(|ptr| ptr.write(value)); + + #[cfg(not(feature = "loom"))] + cell.get().write(value); + } + + pub unsafe fn read(cell: &UnsafeCell) -> T { + #[cfg(feature = "loom")] + let v = cell.with(|ptr| ptr.read()); + + #[cfg(not(feature = "loom"))] + let v = cell.get().read(); + + v + } } pub mod thread { diff --git a/src/single.rs b/src/single.rs index 2ae75fc..c415ba5 100644 --- a/src/single.rs +++ b/src/single.rs @@ -1,9 +1,10 @@ use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; use crate::facade::sync::atomic::{AtomicUsize, Ordering}; -use crate::facade::thread; +use crate::facade::{thread, cell}; use crate::{PopError, PushError}; +use crate::facade::sync::atomic; const LOCKED: usize = 1 << 0; const PUSHED: usize = 1 << 1; @@ -33,7 +34,7 @@ impl Single { if state == 0 { // Write the value and unlock. - unsafe { self.slot.get().write(MaybeUninit::new(value)) } + unsafe { cell::write(&self.slot, MaybeUninit::new(value)) } self.state.fetch_and(!LOCKED, Ordering::Release); Ok(()) } else if state & CLOSED != 0 { @@ -54,7 +55,7 @@ impl Single { if prev == state { // Read the value and unlock. - let value = unsafe { self.slot.get().read().assume_init() }; + let value = unsafe { cell::read(&self.slot).assume_init() }; self.state.fetch_and(!LOCKED, Ordering::Release); return Ok(value); } @@ -112,8 +113,8 @@ impl Single { impl Drop for Single { fn drop(&mut self) { // Drop the value in the slot. - if *self.state.get_mut() & PUSHED != 0 { - let value = unsafe { self.slot.get().read().assume_init() }; + if atomic::load_unique(&mut self.state) & PUSHED != 0 { + let value = unsafe { cell::read(&self.slot).assume_init() }; drop(value); } } diff --git a/src/unbounded.rs b/src/unbounded.rs index b7cfeb7..042e67c 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -2,7 +2,7 @@ use crate::facade::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ptr; use crate::facade::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use crate::facade::thread; +use crate::facade::{thread, cell}; use cache_padded::CachePadded; @@ -37,6 +37,7 @@ struct Slot { } impl Slot { + #[cfg(not(feature = "loom"))] const UNINIT: Slot = Slot { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), @@ -64,9 +65,32 @@ struct Block { impl Block { /// Creates an empty block. fn new() -> Block { + #[cfg(not(feature = "loom"))] + let slots = [Slot::UNINIT; BLOCK_CAP]; + + #[cfg(feature = "loom")] + let slots = { + use std::convert::TryFrom; + + let mut vec = Vec::with_capacity(BLOCK_CAP); + for _ in 0..BLOCK_CAP { + vec.push(Slot { + value: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }); + } + + let res = Box::<[Slot; BLOCK_CAP]>::try_from(vec.into_boxed_slice()); + + match res { + Ok(boxed_array) => *boxed_array, + Err(_) => unreachable!(), + } + }; + Block { next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + slots, } } @@ -205,7 +229,7 @@ impl Unbounded { // Write the value into the slot. let slot = (*block).slots.get_unchecked(offset); - slot.value.get().write(MaybeUninit::new(value)); + cell::write(&slot.value, MaybeUninit::new(value)); slot.state.fetch_or(WRITE, Ordering::Release); return Ok(()); }, @@ -287,7 +311,7 @@ impl Unbounded { // Read the value. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let value = slot.value.get().read().assume_init(); + let value = cell::read(&slot.value).assume_init(); // Destroy the block if we've reached the end, or if another thread wanted to // destroy but couldn't because we were busy reading from the slot. @@ -387,7 +411,7 @@ impl Drop for Unbounded { if offset < BLOCK_CAP { // Drop the value in the slot. let slot = (*block).slots.get_unchecked(offset); - let value = slot.value.get().read().assume_init(); + let value = cell::read(&slot.value).assume_init(); drop(value); } else { // Deallocate the block and move to the next one. From 00b2ed7a52d7cd8ad470b973d945f0e7885d3cc8 Mon Sep 17 00:00:00 2001 From: Restioson Date: Fri, 11 Sep 2020 18:28:26 +0200 Subject: [PATCH 4/5] fix loom dep --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index dd8666b..e7ccb22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,11 @@ categories = ["concurrency"] readme = "README.md" [dependencies] -loom = { version = "0.3", optional = true } cache-padded = "1.1.1" [dev-dependencies] easy-parallel = "3.1.0" fastrand = "1.3.3" + +[target.'cfg(loom)'.dependencies] +loom = { version = "0.3.5" } From 48f294b2766eb4f9df68bec518befb9ab60a271e Mon Sep 17 00:00:00 2001 From: Restioson Date: Sat, 12 Sep 2020 16:00:53 +0200 Subject: [PATCH 5/5] edit cfg --- src/lib.rs | 31 +++++++++++++++++-------------- src/unbounded.rs | 6 +++--- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8b46019..27b075e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,23 +42,24 @@ use crate::unbounded::Unbounded; /// An internal facade abstracting over loom and std types mod facade { pub mod sync { - #[cfg(feature = "loom")] + #[cfg(loom)] pub use loom::sync::*; - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] pub use std::sync::*; pub mod atomic { - #[cfg(feature = "loom")] + #[cfg(loom)] pub use loom::sync::atomic::*; - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] pub use std::sync::atomic::*; + // TODO(loom): loom may add get_mut to &mut AtomicUsize in the future pub fn load_unique(atomic: &mut AtomicUsize) -> usize { - #[cfg(feature = "loom")] + #[cfg(loom)] let v = unsafe { atomic.unsync_load() }; // SAFETY: we have &mut - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] let v = *atomic.get_mut(); v @@ -67,24 +68,26 @@ mod facade { } pub mod cell { - #[cfg(feature = "loom")] + #[cfg(loom)] pub use loom::cell::*; - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] pub use std::cell::*; + // TODO(loom): loom may add get() to &mut AtomicUsize in the future pub unsafe fn write(cell: &UnsafeCell, value: T) { - #[cfg(feature = "loom")] + #[cfg(loom)] cell.with_mut(|ptr| ptr.write(value)); - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] cell.get().write(value); } + // TODO(loom): loom may add get() to &mut AtomicUsize in the future pub unsafe fn read(cell: &UnsafeCell) -> T { - #[cfg(feature = "loom")] + #[cfg(loom)] let v = cell.with(|ptr| ptr.read()); - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] let v = cell.get().read(); v @@ -92,9 +95,9 @@ mod facade { } pub mod thread { - #[cfg(feature = "loom")] + #[cfg(loom)] pub use loom::thread::*; - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] pub use std::thread::*; } } diff --git a/src/unbounded.rs b/src/unbounded.rs index 042e67c..34ff7bd 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -37,7 +37,7 @@ struct Slot { } impl Slot { - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] const UNINIT: Slot = Slot { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), @@ -65,10 +65,10 @@ struct Block { impl Block { /// Creates an empty block. fn new() -> Block { - #[cfg(not(feature = "loom"))] + #[cfg(not(loom))] let slots = [Slot::UNINIT; BLOCK_CAP]; - #[cfg(feature = "loom")] + #[cfg(loom)] let slots = { use std::convert::TryFrom;