diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eea759..dfc96ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ # Changelog +## [4.6.1] - 2022-9-27 +- Make Flower uncloneable to avoid any kind of data races, added FlowerState as alternative. +- Internal only: Replace `Option` with `TypeOpt` managing value of the sync (mtx) state. + ## [4.6.0] - 2022-9-26 - Added 'set_result` and `try_result` fn for more simpler error handling. - Added `IOError` type alias diff --git a/Cargo.toml b/Cargo.toml index e06b913..b783e2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flowync" -version = "4.6.0" +version = "4.6.1" authors = ["Ar37-rs "] edition = "2021" description = "A simple utility for multithreading a/synchronization" diff --git a/src/lib.rs b/src/lib.rs index 9ed31cd..2b6a376 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,26 +13,79 @@ use std::{ }; use std::{sync::Arc, thread}; -struct State +enum TypeOpt where - S: Send, - R: Send, + S: Send + Clone, + R: Send + Clone, +{ + Channel(S), + Success(R), + Error(String), + None, +} + +impl Clone for TypeOpt +where + S: Send + Clone, + R: Send + Clone, +{ + fn clone(&self) -> Self { + match self { + Self::Channel(s) => Self::Channel(Clone::clone(s)), + Self::Success(r) => Self::Success(Clone::clone(r)), + Self::Error(e) => Self::Error(Clone::clone(e)), + Self::None => Self::None, + } + } +} + +impl Debug for TypeOpt +where + S: Send + Clone + Debug, + R: Send + Clone + Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::Channel(s) => f.debug_tuple("Channel").field(s).finish(), + Self::Success(r) => f.debug_tuple("Success").field(r).finish(), + Self::Error(e) => f.debug_tuple("Error").field(e).finish(), + Self::None => write!(f, "None"), + } + } +} + +impl TypeOpt +where + S: Send + Clone, + R: Send + Clone, +{ + fn take(&mut self) -> Self { + let _take = self.clone(); + *self = TypeOpt::None; + _take + } +} + +struct InnerState +where + S: Send + Clone, + R: Send + Clone, { activated: AtomicBool, result_ready: AtomicBool, channel_present: AtomicBool, - mtx: Mutex<(Option, Option, Option)>, + mtx: Mutex>, cvar: Condvar, canceled: AtomicBool, } -impl Debug for State +impl Debug for InnerState where - S: Debug + Send, - R: Debug + Send, + S: Debug + Send + Clone, + R: Debug + Send + Clone, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("State") + f.debug_struct("InnerState") .field("result_ready", &self.result_ready) .field("channel_present", &self.channel_present) .field("mtx", &self.mtx) @@ -43,14 +96,257 @@ where } } -impl Drop for State +impl Drop for InnerState where - S: Send, - R: Send, + S: Send + Clone, + R: Send + Clone, { fn drop(&mut self) {} } +/// State of the flower +pub struct FlowerState +where + S: Send + Clone, + R: Send + Clone, +{ + state: Arc>, + async_suspender: Arc<(Mutex>, AtomicBool)>, + id: usize, +} + +impl Debug for FlowerState +where + S: Send + Clone + Debug, + R: Send + Clone + Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("FlowerState") + .field("state", &self.state) + .field("async_suspender", &self.async_suspender) + .field("id", &self.id) + .finish() + } +} + +impl FlowerState +where + S: Send + Clone, + R: Send + Clone, +{ + /// Get ID of the flower. + pub fn id(&self) -> usize { + self.id + } + + /// Cancel flower. + /// + /// will do nothing if not explicitly configured on the `Handle`. + pub fn cancel(&self) { + self.state.canceled.store(true, Ordering::Relaxed); + } + + /// Check if the flower is canceled + pub fn is_canceled(&self) -> bool { + self.state.canceled.load(Ordering::Relaxed) + } + + /// Check if the current flower is active + pub fn is_active(&self) -> bool { + self.state.activated.load(Ordering::Relaxed) + } +} + +impl Clone for FlowerState +where + S: Send + Clone, + R: Send + Clone, +{ + fn clone(&self) -> Self { + Self { + state: Clone::clone(&self.state), + async_suspender: Clone::clone(&self.async_suspender), + id: self.id, + } + } +} + +impl Drop for FlowerState +where + S: Send + Clone, + R: Send + Clone, +{ + fn drop(&mut self) {} +} + +struct AsyncSuspender { + inner: Arc<(Mutex>, AtomicBool)>, +} + +impl Future for AsyncSuspender { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut mtx = self.inner.0.lock().unwrap(); + if !self.inner.1.load(Ordering::Relaxed) { + Poll::Ready(()) + } else { + *mtx = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +/// A handle for the Flower +pub struct Handle +where + S: Send + Clone, + R: Send + Clone, +{ + state: Arc>, + async_suspender: Arc<(Mutex>, AtomicBool)>, + id: usize, +} + +impl Handle +where + S: Send + Clone, + R: Send + Clone, +{ + /// Get ID of the flower. + pub fn id(&self) -> usize { + self.id + } + + /// Activate current flower + pub fn activate(&self) { + self.state.activated.store(true, Ordering::Relaxed); + } + + /// Check if the current flower is active + pub fn is_active(&self) -> bool { + self.state.activated.load(Ordering::Relaxed) + } + + /// Check if the current flower should be canceled + pub fn should_cancel(&self) -> bool { + self.state.canceled.load(Ordering::Relaxed) + } + + /// Send current progress value + pub fn send(&self, s: S) { + let mut mtx = self.state.mtx.lock().unwrap(); + { + *mtx = TypeOpt::Channel(s); + self.state.channel_present.store(true, Ordering::Relaxed); + self.async_suspender.1.store(false, Ordering::Relaxed); + } + let _ = self.state.cvar.wait(mtx); + } + + /// Send current progress value asynchronously. + pub async fn send_async(&self, s: S) { + { + *self.state.mtx.lock().unwrap() = TypeOpt::Channel(s); + self.async_suspender.1.store(true, Ordering::Relaxed); + self.state.channel_present.store(true, Ordering::Relaxed); + } + AsyncSuspender { + inner: self.async_suspender.clone(), + } + .await + } + + /// Set result value + pub fn set_result(&self, r: Result>) { + match r { + Ok(val) => self.success(val), + Err(e) => self.error(e), + } + } + + /// Set the Ok value of the result. + pub fn success(&self, r: R) { + *self.state.mtx.lock().unwrap() = TypeOpt::Success(r); + self.state.result_ready.store(true, Ordering::Relaxed); + } + + /// Set the Err value of the result. + pub fn error(&self, e: impl ToString) { + *self.state.mtx.lock().unwrap() = TypeOpt::Error(e.to_string()); + self.state.result_ready.store(true, Ordering::Relaxed); + } +} + +impl Clone for Handle +where + S: Send + Clone, + R: Send + Clone, +{ + fn clone(&self) -> Self { + Self { + state: Clone::clone(&self.state), + async_suspender: Clone::clone(&self.async_suspender), + id: self.id, + } + } +} + +impl Drop for Handle +where + S: Send + Clone, + R: Send + Clone, +{ + fn drop(&mut self) { + if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) { + self.error(format!( + "the flower handle with id: {} error, the thread panicked maybe?", + self.id + )); + } + } +} + +impl Debug for Handle +where + S: Debug + Send + Clone, + R: Debug + Send + Clone, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Handle") + .field("state", &self.state) + .field("awaiting", &self.async_suspender) + .field("id", &self.id) + .finish() + } +} + +pub struct Extracted<'a, S: Send + Clone, R: Send + Clone>(&'a Flower); + +impl Extracted<'_, S, R> +where + S: Send + Clone, + R: Send + Clone, +{ + /// Try finalize result of the flower. + pub fn finalize(self, f: impl FnOnce(Result)) { + let flower = self.0; + if flower.state.result_ready.load(Ordering::Relaxed) { + let result = move || { + let result = flower.state.mtx.lock().unwrap().take(); + flower.state.result_ready.store(false, Ordering::Relaxed); + flower.state.activated.store(false, Ordering::Relaxed); + result + }; + let result = result(); + if let TypeOpt::Success(value) = result { + f(Ok(value)) + } else if let TypeOpt::Error(value) = result { + f(Err(value)) + } + } + } +} + /// Flow loosely and gracefully. /// /// Where: @@ -126,81 +422,64 @@ where /// ``` pub struct Flower where - S: Send, - R: Send, + S: Send + Clone, + R: Send + Clone, { - state: Arc>, - awaiting: Arc<(Mutex>, AtomicBool)>, + state: Arc>, + async_suspender: Arc<(Mutex>, AtomicBool)>, id: usize, } -pub struct Extracted<'a, S: Send, R: Send>(&'a Flower); - -impl Extracted<'_, S, R> -where - S: Send, - R: Send, -{ - /// Try finalize result of the flower. - pub fn finalize(self, f: impl FnOnce(Result)) { - let _self = self.0; - if _self.state.result_ready.load(Ordering::Relaxed) { - let catch = move || { - let mut result_value = _self.state.mtx.lock().unwrap(); - let (_, ok, error) = &mut *result_value; - _self.state.result_ready.store(false, Ordering::Relaxed); - _self.state.activated.store(false, Ordering::Relaxed); - (ok.take(), error.take()) - }; - - let (ok, err) = catch(); - if let Some(value) = ok { - f(Ok(value)); - } else if let Some(value) = err { - f(Err(value)); - } - } - } -} - impl Flower where - S: Send, - R: Send, + S: Send + Clone, + R: Send + Clone, { pub fn new(id: usize) -> Self { Self { - state: Arc::new(State { + state: Arc::new(InnerState { activated: AtomicBool::new(false), result_ready: AtomicBool::new(false), channel_present: AtomicBool::new(false), - mtx: Mutex::new((None, None, None)), + mtx: Mutex::new(TypeOpt::None), cvar: Condvar::new(), canceled: AtomicBool::new(false), }), - awaiting: Arc::new((Mutex::new(None), AtomicBool::new(false))), + async_suspender: Arc::new((Mutex::new(None), AtomicBool::new(false))), id, } } - /// Get ID of the flower. + /// Get the ID. pub fn id(&self) -> usize { self.id } - /// Get handle of the flower. + /// Get the handle. pub fn handle(&self) -> Handle { self.state.canceled.store(false, Ordering::Relaxed); Handle { state: Clone::clone(&self.state), - awaiting: Clone::clone(&self.awaiting), + async_suspender: Clone::clone(&self.async_suspender), id: self.id, } } - /// Cancel current flower handle. + /// Get the state /// - /// will do nothing if not explicitly configured. + /// Since `Flower` itself is uncloneable to avoid data races, this is an alternative `fn` for `self.clone()` + pub fn state(&self) -> FlowerState { + self.state.canceled.store(false, Ordering::Relaxed); + FlowerState { + state: Clone::clone(&self.state), + async_suspender: Clone::clone(&self.async_suspender), + id: self.id, + } + } + + /// Cancel flower. + /// + /// will do nothing if not explicitly configured on the `Handle`. pub fn cancel(&self) { self.state.canceled.store(true, Ordering::Relaxed); } @@ -235,24 +514,21 @@ where /// **Warning!** don't use this fn if channel value is important, use `extract fn` and then use `finalize fn` instead. pub fn try_result(&self, f: impl FnOnce(Result)) { if self.state.channel_present.load(Ordering::Relaxed) { - let _ = self.state.mtx.lock().unwrap().0.take(); self.state.cvar.notify_all(); } if self.state.result_ready.load(Ordering::Relaxed) { let _self = self; - let catch = move || { - let mut result_value = _self.state.mtx.lock().unwrap(); - let (_, ok, error) = &mut *result_value; + let result = move || { + let result = _self.state.mtx.lock().unwrap().take(); _self.state.result_ready.store(false, Ordering::Relaxed); _self.state.activated.store(false, Ordering::Relaxed); - (ok.take(), error.take()) + result }; - - let (ok, err) = catch(); - if let Some(value) = ok { - f(Ok(value)); - } else if let Some(value) = err { - f(Err(value)); + let result = result(); + if let TypeOpt::Success(value) = result { + f(Ok(value)) + } else if let TypeOpt::Error(value) = result { + f(Err(value)) } } } @@ -260,22 +536,26 @@ where /// Try extract channel value of the flower if available, and then `finalize` (must_use) pub fn extract(&self, f: impl FnOnce(Option)) -> Extracted<'_, S, R> { if self.state.channel_present.load(Ordering::Relaxed) { - let catch = move || { - let value = self.state.mtx.lock().unwrap().0.take(); + let channel = move || { + let channel = self.state.mtx.lock().unwrap().take(); self.state.channel_present.store(false, Ordering::Relaxed); - if self.awaiting.1.load(Ordering::Relaxed) { - let mut mg_opt_waker = self.awaiting.0.lock().unwrap(); - self.awaiting.1.store(false, Ordering::Relaxed); + if self.async_suspender.1.load(Ordering::Relaxed) { + let mut mg_opt_waker = self.async_suspender.0.lock().unwrap(); + self.async_suspender.1.store(false, Ordering::Relaxed); if let Some(waker) = mg_opt_waker.take() { waker.wake(); } } else { self.state.cvar.notify_all(); } - value + if let TypeOpt::Channel(value) = channel { + Some(value) + } else { + None + } }; - let value = catch(); - f(value) + let channel = channel(); + f(channel) } else { f(None) } @@ -286,36 +566,22 @@ where impl Debug for Flower where - S: Debug + Send, - R: Debug + Send, + S: Debug + Send + Clone, + R: Debug + Send + Clone, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Flower") .field("state", &self.state) - .field("awaiting", &self.awaiting) + .field("async_suspender", &self.async_suspender) .field("id", &self.id) .finish() } } -impl Clone for Flower -where - S: Send, - R: Send, -{ - fn clone(&self) -> Self { - Self { - state: Clone::clone(&self.state), - awaiting: Clone::clone(&self.awaiting), - id: self.id, - } - } -} - impl Drop for Flower where - S: Send, - R: Send, + S: Send + Clone, + R: Send + Clone, { fn drop(&mut self) { if thread::panicking() { @@ -324,160 +590,9 @@ where } } -/// A handle for the Flower -pub struct Handle -where - S: Send, - R: Send, -{ - state: Arc>, - awaiting: Arc<(Mutex>, AtomicBool)>, - id: usize, -} - -impl Handle -where - S: Send, - R: Send, -{ - /// Get ID of the flower. - pub fn id(&self) -> usize { - self.id - } - - /// Activate current flower - pub fn activate(&self) { - self.state.activated.store(true, Ordering::Relaxed); - } - - /// Check if the current flower is active - pub fn is_active(&self) -> bool { - self.state.activated.load(Ordering::Relaxed) - } - - /// Check if the current flower should be canceled - pub fn should_cancel(&self) -> bool { - self.state.canceled.load(Ordering::Relaxed) - } - - /// Send current progress value - pub fn send(&self, s: S) { - let mut mtx = self.state.mtx.lock().unwrap(); - { - mtx.0 = Some(s); - self.state.channel_present.store(true, Ordering::Relaxed); - self.awaiting.1.store(false, Ordering::Relaxed); - } - let _ = self.state.cvar.wait(mtx); - } - - /// Send current progress value asynchronously. - pub async fn send_async(&self, s: S) { - { - self.state.mtx.lock().unwrap().0 = Some(s); - self.awaiting.1.store(true, Ordering::Relaxed); - self.state.channel_present.store(true, Ordering::Relaxed); - } - AsyncSuspender { - awaiting: self.awaiting.clone(), - } - .await - } - - /// Set result value - pub fn set_result(&self, r: Result>) { - match r { - Ok(val) => self.success(val), - Err(e) => self.error(e), - } - } - - /// Set the Ok value of the result. - pub fn success(&self, r: R) { - { - let mut result = self.state.mtx.lock().unwrap(); - let (_, ok, error) = &mut *result; - *ok = Some(r); - *error = None; - } - self.state.result_ready.store(true, Ordering::Relaxed); - } - - /// Set the Err value of the result. - pub fn error(&self, e: impl ToString) { - { - let mut result = self.state.mtx.lock().unwrap(); - let (_, ok, error) = &mut *result; - *error = Some(e.to_string()); - *ok = None; - } - self.state.result_ready.store(true, Ordering::Relaxed); - } -} - -struct AsyncSuspender { - awaiting: Arc<(Mutex>, AtomicBool)>, -} - -impl Future for AsyncSuspender { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut mtx = self.awaiting.0.lock().unwrap(); - if !self.awaiting.1.load(Ordering::Relaxed) { - Poll::Ready(()) - } else { - *mtx = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -impl Clone for Handle -where - S: Send, - R: Send, -{ - fn clone(&self) -> Self { - Self { - state: Clone::clone(&self.state), - awaiting: Clone::clone(&self.awaiting), - id: self.id, - } - } -} - -impl Drop for Handle -where - S: Send, - R: Send, -{ - fn drop(&mut self) { - if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) { - self.error(format!( - "the flower handle with id: {} error, the thread panicked maybe?", - self.id - )); - } - } -} - -impl Debug for Handle -where - S: Debug + Send, - R: Debug + Send, -{ - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("Handle") - .field("state", &self.state) - .field("awaiting", &self.awaiting) - .field("id", &self.id) - .finish() - } -} - -pub type OIError = Box; -// Alternative alias to avoid conflict with other crate type -pub type IOError = OIError; +pub type IOError = Box; +/// Alternative alias to avoid conflict with other crate type +pub type OIError = IOError; /// A converter to convert `Option` into `Result` using `catch` fn. pub trait IntoResult {