From 83ec4321529a59bbb1ed36dfcfc3ccff93d80ec6 Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Sun, 13 Oct 2024 15:09:08 -0400 Subject: [PATCH 1/2] Make library usable in wasm32 by conditionally remove Send + Sync --- eyeball-im-util/src/vector/traits.rs | 48 ++++++-- eyeball-im/src/vector.rs | 10 +- eyeball-im/src/vector/entry.rs | 9 +- eyeball-im/src/vector/subscriber.rs | 17 ++- eyeball-im/src/vector/traits.rs | 173 +++++++++++++++++++++++++++ eyeball-im/src/vector/transaction.rs | 13 +- eyeball/src/subscriber/async_lock.rs | 8 +- 7 files changed, 251 insertions(+), 27 deletions(-) create mode 100644 eyeball-im/src/vector/traits.rs diff --git a/eyeball-im-util/src/vector/traits.rs b/eyeball-im-util/src/vector/traits.rs index e24c705..30e3b6d 100644 --- a/eyeball-im-util/src/vector/traits.rs +++ b/eyeball-im-util/src/vector/traits.rs @@ -15,6 +15,34 @@ use super::{ EmptyLimitStream, Filter, FilterMap, Limit, Sort, SortBy, SortByKey, }; +/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(not(target_arch = "wasm32"))] +pub trait SendOutsideWasm: Send {} +#[cfg(not(target_arch = "wasm32"))] +impl SendOutsideWasm for T {} + +/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(target_arch = "wasm32")] +pub trait SendOutsideWasm {} +#[cfg(target_arch = "wasm32")] +impl SendOutsideWasm for T {} + +/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(not(target_arch = "wasm32"))] +pub trait SyncOutsideWasm: Sync {} +#[cfg(not(target_arch = "wasm32"))] +impl SyncOutsideWasm for T {} + +/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(target_arch = "wasm32")] +pub trait SyncOutsideWasm {} +#[cfg(target_arch = "wasm32")] +impl SyncOutsideWasm for T {} + /// Abstraction over stream items that the adapters in this module can deal /// with. pub trait VectorDiffContainer: @@ -22,18 +50,20 @@ pub trait VectorDiffContainer: { /// The element type of the [`Vector`][imbl::Vector] that diffs are being /// handled for. - type Element: Clone + Send + Sync + 'static; + type Element: Clone + SendOutsideWasm + SyncOutsideWasm + 'static; #[doc(hidden)] type Family: VectorDiffContainerFamily = Self>; } -impl VectorDiffContainer for VectorDiff { +impl VectorDiffContainer for VectorDiff { type Element = T; type Family = VectorDiffFamily; } -impl VectorDiffContainer for Vec> { +impl VectorDiffContainer + for Vec> +{ type Element = T; type Family = VecVectorDiffFamily; } @@ -69,7 +99,9 @@ pub trait VectorObserver: Sized { fn into_parts(self) -> (Vector, Self::Stream); } -impl VectorObserver for VectorSubscriber { +impl VectorObserver + for VectorSubscriber +{ type Stream = VectorSubscriberStream; fn into_parts(self) -> (Vector, Self::Stream) { @@ -77,7 +109,9 @@ impl VectorObserver for VectorSubscriber } } -impl VectorObserver for BatchedVectorSubscriber { +impl VectorObserver + for BatchedVectorSubscriber +{ type Stream = VectorSubscriberBatchedStream; fn into_parts(self) -> (Vector, Self::Stream) { @@ -102,7 +136,7 @@ where /// See that trait for which types implement this. pub trait VectorObserverExt: VectorObserver where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, ::Item: VectorDiffContainer, { /// Filter the vector's values with the given function. @@ -197,7 +231,7 @@ where impl VectorObserverExt for O where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, O: VectorObserver, ::Item: VectorDiffContainer, { diff --git a/eyeball-im/src/vector.rs b/eyeball-im/src/vector.rs index 27f3598..ae4ea62 100644 --- a/eyeball-im/src/vector.rs +++ b/eyeball-im/src/vector.rs @@ -2,9 +2,11 @@ use std::{fmt, ops}; use imbl::Vector; use tokio::sync::broadcast::{self, Sender}; +use traits::{SendOutsideWasm, SyncOutsideWasm}; mod entry; mod subscriber; +mod traits; mod transaction; pub use self::{ @@ -22,7 +24,7 @@ pub struct ObservableVector { sender: Sender>, } -impl ObservableVector { +impl ObservableVector { /// Create a new `ObservableVector`. /// /// As of the time of writing, this is equivalent to @@ -290,7 +292,7 @@ impl ObservableVector { } } -impl Default for ObservableVector { +impl Default for ObservableVector { fn default() -> Self { Self::new() } @@ -315,7 +317,9 @@ impl ops::Deref for ObservableVector { } } -impl From> for ObservableVector { +impl From> + for ObservableVector +{ fn from(values: Vector) -> Self { let mut this = Self::new(); this.append(values); diff --git a/eyeball-im/src/vector/entry.rs b/eyeball-im/src/vector/entry.rs index cf6b117..cb52c56 100644 --- a/eyeball-im/src/vector/entry.rs +++ b/eyeball-im/src/vector/entry.rs @@ -1,6 +1,9 @@ use std::{fmt, ops::Deref}; -use super::ObservableVector; +use super::{ + traits::{SendOutsideWasm, SyncOutsideWasm}, + ObservableVector, +}; /// A handle to a single value in an [`ObservableVector`]. pub struct ObservableVectorEntry<'a, T> { @@ -10,7 +13,7 @@ pub struct ObservableVectorEntry<'a, T> { impl<'a, T> ObservableVectorEntry<'a, T> where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, { pub(super) fn new(inner: &'a mut ObservableVector, index: usize) -> Self { Self { inner, index: EntryIndex::Owned(index) } @@ -115,7 +118,7 @@ pub struct ObservableVectorEntries<'a, T> { impl<'a, T> ObservableVectorEntries<'a, T> where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, { pub(super) fn new(inner: &'a mut ObservableVector) -> Self { Self { inner, index: 0 } diff --git a/eyeball-im/src/vector/subscriber.rs b/eyeball-im/src/vector/subscriber.rs index a2d44e2..7d27481 100644 --- a/eyeball-im/src/vector/subscriber.rs +++ b/eyeball-im/src/vector/subscriber.rs @@ -12,11 +12,14 @@ use tokio::sync::broadcast::{ error::{RecvError, TryRecvError}, Receiver, }; -use tokio_util::sync::ReusableBoxFuture; + #[cfg(feature = "tracing")] use tracing::info; -use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff}; +use super::{ + traits::{ReusableBoxFuture, SendOutsideWasm, SyncOutsideWasm}, + BroadcastMessage, OneOrManyDiffs, VectorDiff, +}; /// A subscriber for updates of a [`Vector`]. #[derive(Debug)] @@ -25,7 +28,7 @@ pub struct VectorSubscriber { rx: Receiver>, } -impl VectorSubscriber { +impl VectorSubscriber { pub(super) fn new(items: Vector, rx: Receiver>) -> Self { Self { values: items, rx } } @@ -100,7 +103,7 @@ enum VectorSubscriberStreamState { // Not clear why this explicit impl is needed, but it's not unsafe so it is fine impl Unpin for VectorSubscriberStreamState {} -impl Stream for VectorSubscriberStream { +impl Stream for VectorSubscriberStream { type Item = VectorDiff; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -171,7 +174,9 @@ impl VectorSubscriberBatchedStream { } } -impl Stream for VectorSubscriberBatchedStream { +impl Stream + for VectorSubscriberBatchedStream +{ type Item = Vec>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -213,7 +218,7 @@ impl Stream for VectorSubscriberBatchedStream< } } -fn handle_lag( +fn handle_lag( rx: &mut Receiver>, ) -> Option> { let mut msg = None; diff --git a/eyeball-im/src/vector/traits.rs b/eyeball-im/src/vector/traits.rs new file mode 100644 index 0000000..0b83ef2 --- /dev/null +++ b/eyeball-im/src/vector/traits.rs @@ -0,0 +1,173 @@ +use std::alloc::Layout; +use std::future::{self, Future}; +use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, ptr}; + +#[cfg(not(target_arch = "wasm32"))] +pub use tokio_util::sync::ReusableBoxFuture; +struct CallOnDrop O> { + f: ManuallyDrop, +} + +impl O> CallOnDrop { + fn new(f: F) -> Self { + let f = ManuallyDrop::new(f); + Self { f } + } + fn call(self) -> O { + let mut this = ManuallyDrop::new(self); + let f = unsafe { ManuallyDrop::take(&mut this.f) }; + f() + } +} + +impl O> Drop for CallOnDrop { + fn drop(&mut self) { + let f = unsafe { ManuallyDrop::take(&mut self.f) }; + f(); + } +} + +fn reuse_pin_box(boxed: Pin>, new_value: U, callback: F) -> Result +where + F: FnOnce(Box) -> O, +{ + let layout = Layout::for_value::(&*boxed); + if layout != Layout::new::() { + return Err(new_value); + } + + // SAFETY: We don't ever construct a non-pinned reference to the old `T` from now on, and we + // always drop the `T`. + let raw: *mut T = Box::into_raw(unsafe { Pin::into_inner_unchecked(boxed) }); + + // When dropping the old value panics, we still want to call `callback` — so move the rest of + // the code into a guard type. + let guard = CallOnDrop::new(|| { + let raw: *mut U = raw.cast::(); + unsafe { raw.write(new_value) }; + + // SAFETY: + // - `T` and `U` have the same layout. + // - `raw` comes from a `Box` that uses the same allocator as this one. + // - `raw` points to a valid instance of `U` (we just wrote it in). + let boxed = unsafe { Box::from_raw(raw) }; + + callback(boxed) + }); + + // Drop the old value. + unsafe { ptr::drop_in_place(raw) }; + + // Run the rest of the code. + Ok(guard.call()) +} + +/// A reusable `Pin + Send + 'a>>`. +/// +/// This type lets you replace the future stored in the box without +/// reallocating when the size and alignment permits this. +#[cfg(target_arch = "wasm32")] +pub struct ReusableBoxFuture<'a, T> { + boxed: Pin + 'a>>, +} + +#[cfg(target_arch = "wasm32")] +impl<'a, T> fmt::Debug for ReusableBoxFuture<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxFuture").field("boxed", &"Future").finish() + } +} + +#[cfg(target_arch = "wasm32")] +impl<'a, T> ReusableBoxFuture<'a, T> { + /// Create a new `ReusableBoxFuture` containing the provided future. + pub fn new(future: F) -> Self + where + F: Future + 'a, + { + Self { boxed: Box::pin(future) } + } + + /// Replace the future currently stored in this box. + /// + /// This reallocates if and only if the layout of the provided future is + /// different from the layout of the currently stored future. + pub fn set(&mut self, future: F) + where + F: Future + SendOutsideWasm + 'a, + { + if let Err(future) = self.try_set(future) { + *self = Self::new(future); + } + } + + /// Replace the future currently stored in this box. + /// + /// This function never reallocates, but returns an error if the provided + /// future has a different size or alignment from the currently stored + /// future. + pub fn try_set(&mut self, future: F) -> Result<(), F> + where + F: Future + SendOutsideWasm + 'a, + { + // If we try to inline the contents of this function, the type checker complains because + // the bound `T: 'a` is not satisfied in the call to `pending()`. But by putting it in an + // inner function that doesn't have `T` as a generic parameter, we implicitly get the bound + // `F::Output: 'a` transitively through `F: 'a`, allowing us to call `pending()`. + #[inline(always)] + fn real_try_set<'a, F>( + this: &mut ReusableBoxFuture<'a, F::Output>, + future: F, + ) -> Result<(), F> + where + F: Future + SendOutsideWasm + 'a, + { + // future::Pending is a ZST so this never allocates. + let boxed = mem::replace(&mut this.boxed, Box::pin(future::pending())); + reuse_pin_box(boxed, future, |boxed| this.boxed = Pin::from(boxed)) + } + + real_try_set(self, future) + } + + /// Get a pinned reference to the underlying future. + pub fn get_pin(&mut self) -> Pin<&mut (dyn Future)> { + self.boxed.as_mut() + } + + /// Poll the future stored inside this box. + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + self.get_pin().poll(cx) + } +} + +/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(not(target_arch = "wasm32"))] +pub trait SendOutsideWasm: Send {} +#[cfg(not(target_arch = "wasm32"))] +impl SendOutsideWasm for T {} + +/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(target_arch = "wasm32")] +pub trait SendOutsideWasm {} +#[cfg(target_arch = "wasm32")] +impl SendOutsideWasm for T {} + +/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(not(target_arch = "wasm32"))] +pub trait SyncOutsideWasm: Sync {} +#[cfg(not(target_arch = "wasm32"))] +impl SyncOutsideWasm for T {} + +/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on +/// wasm. +#[cfg(target_arch = "wasm32")] +pub trait SyncOutsideWasm {} +#[cfg(target_arch = "wasm32")] +impl SyncOutsideWasm for T {} diff --git a/eyeball-im/src/vector/transaction.rs b/eyeball-im/src/vector/transaction.rs index adc8e9d..68b70ac 100644 --- a/eyeball-im/src/vector/transaction.rs +++ b/eyeball-im/src/vector/transaction.rs @@ -2,7 +2,10 @@ use std::{fmt, mem, ops}; use imbl::Vector; -use crate::vector::OneOrManyDiffs; +use crate::vector::{ + traits::{SendOutsideWasm, SyncOutsideWasm}, + OneOrManyDiffs, +}; use super::{entry::EntryIndex, BroadcastMessage, ObservableVector, VectorDiff}; @@ -21,7 +24,9 @@ pub struct ObservableVectorTransaction<'o, T: Clone> { batch: Vec>, } -impl<'o, T: Clone + Send + Sync + 'static> ObservableVectorTransaction<'o, T> { +impl<'o, T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> + ObservableVectorTransaction<'o, T> +{ pub(super) fn new(inner: &'o mut ObservableVector) -> Self { let values = inner.values.clone(); Self { inner, values, batch: Vec::new() } @@ -316,7 +321,7 @@ pub struct ObservableVectorTransactionEntry<'a, 'o, T: Clone> { impl<'a, 'o, T> ObservableVectorTransactionEntry<'a, 'o, T> where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, { pub(super) fn new(inner: &'a mut ObservableVectorTransaction<'o, T>, index: usize) -> Self { Self { inner, index: EntryIndex::Owned(index) } @@ -397,7 +402,7 @@ pub struct ObservableVectorTransactionEntries<'a, 'o, T: Clone> { impl<'a, 'o, T> ObservableVectorTransactionEntries<'a, 'o, T> where - T: Clone + Send + Sync + 'static, + T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static, { pub(super) fn new(inner: &'a mut ObservableVectorTransaction<'o, T>) -> Self { Self { inner, index: 0 } diff --git a/eyeball/src/subscriber/async_lock.rs b/eyeball/src/subscriber/async_lock.rs index 1380646..31e8ecb 100644 --- a/eyeball/src/subscriber/async_lock.rs +++ b/eyeball/src/subscriber/async_lock.rs @@ -17,7 +17,7 @@ pub struct AsyncSubscriberState { get_lock: ReusableBoxFuture<'static, OwnedSharedReadGuard>>, } -impl Clone for AsyncSubscriberState { +impl Clone for AsyncSubscriberState { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -32,7 +32,7 @@ impl fmt::Debug for AsyncSubscriberState { } } -impl Subscriber { +impl Subscriber { pub(crate) fn new_async(inner: SharedReadLock>, version: u64) -> Self { let get_lock = ReusableBoxFuture::new(inner.clone().lock_owned()); Self { state: AsyncSubscriberState { inner, get_lock }, observed_version: version } @@ -147,7 +147,7 @@ impl Subscriber { } } -impl Stream for Subscriber { +impl Stream for Subscriber { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -155,7 +155,7 @@ impl Stream for Subscriber { } } -impl Future for Next<'_, T, AsyncLock> { +impl Future for Next<'_, T, AsyncLock> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { From 1fe5df1d884db97dda030837caf2e79001f0ac93 Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Sun, 13 Oct 2024 15:18:37 -0400 Subject: [PATCH 2/2] Revise permissions on ReusableBoxFuture polyfill --- eyeball-im/src/vector/traits.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eyeball-im/src/vector/traits.rs b/eyeball-im/src/vector/traits.rs index 0b83ef2..f99d3d9 100644 --- a/eyeball-im/src/vector/traits.rs +++ b/eyeball-im/src/vector/traits.rs @@ -70,7 +70,7 @@ where /// This type lets you replace the future stored in the box without /// reallocating when the size and alignment permits this. #[cfg(target_arch = "wasm32")] -pub struct ReusableBoxFuture<'a, T> { +pub(crate) struct ReusableBoxFuture<'a, T> { boxed: Pin + 'a>>, } @@ -84,7 +84,7 @@ impl<'a, T> fmt::Debug for ReusableBoxFuture<'a, T> { #[cfg(target_arch = "wasm32")] impl<'a, T> ReusableBoxFuture<'a, T> { /// Create a new `ReusableBoxFuture` containing the provided future. - pub fn new(future: F) -> Self + pub(crate) fn new(future: F) -> Self where F: Future + 'a, { @@ -95,7 +95,7 @@ impl<'a, T> ReusableBoxFuture<'a, T> { /// /// This reallocates if and only if the layout of the provided future is /// different from the layout of the currently stored future. - pub fn set(&mut self, future: F) + pub(crate) fn set(&mut self, future: F) where F: Future + SendOutsideWasm + 'a, { @@ -109,7 +109,7 @@ impl<'a, T> ReusableBoxFuture<'a, T> { /// This function never reallocates, but returns an error if the provided /// future has a different size or alignment from the currently stored /// future. - pub fn try_set(&mut self, future: F) -> Result<(), F> + pub(crate) fn try_set(&mut self, future: F) -> Result<(), F> where F: Future + SendOutsideWasm + 'a, { @@ -134,12 +134,12 @@ impl<'a, T> ReusableBoxFuture<'a, T> { } /// Get a pinned reference to the underlying future. - pub fn get_pin(&mut self) -> Pin<&mut (dyn Future)> { + pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future)> { self.boxed.as_mut() } /// Poll the future stored inside this box. - pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { self.get_pin().poll(cx) } }