Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wasm32 support #62

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions eyeball-im-util/src/vector/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,55 @@ use super::{
EmptyLimitStream, Filter, FilterMap, Limit, Sort, SortBy, SortByKey,
};

/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(not(target_arch = "wasm32"))]
pub trait SendOutsideWasm: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send> SendOutsideWasm for T {}

/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(target_arch = "wasm32")]
pub trait SendOutsideWasm {}
#[cfg(target_arch = "wasm32")]
impl<T> SendOutsideWasm for T {}

/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(not(target_arch = "wasm32"))]
pub trait SyncOutsideWasm: Sync {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Sync> SyncOutsideWasm for T {}

/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(target_arch = "wasm32")]
pub trait SyncOutsideWasm {}
#[cfg(target_arch = "wasm32")]
impl<T> SyncOutsideWasm for T {}

/// Abstraction over stream items that the adapters in this module can deal
/// with.
pub trait VectorDiffContainer:
VectorDiffContainerOps<Self::Element, Family = <Self as VectorDiffContainer>::Family>
{
/// The element type of the [`Vector`][imbl::Vector] that diffs are being
/// handled for.
type Element: Clone + Send + Sync + 'static;
type Element: Clone + SendOutsideWasm + SyncOutsideWasm + 'static;

#[doc(hidden)]
type Family: VectorDiffContainerFamily<Member<Self::Element> = Self>;
}

impl<T: Clone + Send + Sync + 'static> VectorDiffContainer for VectorDiff<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> VectorDiffContainer for VectorDiff<T> {
type Element = T;
type Family = VectorDiffFamily;
}

impl<T: Clone + Send + Sync + 'static> VectorDiffContainer for Vec<VectorDiff<T>> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> VectorDiffContainer
for Vec<VectorDiff<T>>
{
type Element = T;
type Family = VecVectorDiffFamily;
}
Expand Down Expand Up @@ -69,15 +99,19 @@ pub trait VectorObserver<T>: Sized {
fn into_parts(self) -> (Vector<T>, Self::Stream);
}

impl<T: Clone + Send + Sync + 'static> VectorObserver<T> for VectorSubscriber<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> VectorObserver<T>
for VectorSubscriber<T>
{
type Stream = VectorSubscriberStream<T>;

fn into_parts(self) -> (Vector<T>, Self::Stream) {
self.into_values_and_stream()
}
}

impl<T: Clone + Send + Sync + 'static> VectorObserver<T> for BatchedVectorSubscriber<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> VectorObserver<T>
for BatchedVectorSubscriber<T>
{
type Stream = VectorSubscriberBatchedStream<T>;

fn into_parts(self) -> (Vector<T>, Self::Stream) {
Expand All @@ -102,7 +136,7 @@ where
/// See that trait for which types implement this.
pub trait VectorObserverExt<T>: VectorObserver<T>
where
T: Clone + Send + Sync + 'static,
T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
<Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
{
/// Filter the vector's values with the given function.
Expand Down Expand Up @@ -197,7 +231,7 @@ where

impl<T, O> VectorObserverExt<T> for O
where
T: Clone + Send + Sync + 'static,
T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
O: VectorObserver<T>,
<Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
{
Expand Down
10 changes: 7 additions & 3 deletions eyeball-im/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::{fmt, ops};

use imbl::Vector;
use tokio::sync::broadcast::{self, Sender};
use traits::{SendOutsideWasm, SyncOutsideWasm};

mod entry;
mod subscriber;
mod traits;
mod transaction;

pub use self::{
Expand All @@ -22,7 +24,7 @@ pub struct ObservableVector<T> {
sender: Sender<BroadcastMessage<T>>,
}

impl<T: Clone + Send + Sync + 'static> ObservableVector<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> ObservableVector<T> {
/// Create a new `ObservableVector`.
///
/// As of the time of writing, this is equivalent to
Expand Down Expand Up @@ -290,7 +292,7 @@ impl<T: Clone + Send + Sync + 'static> ObservableVector<T> {
}
}

impl<T: Clone + Send + Sync + 'static> Default for ObservableVector<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> Default for ObservableVector<T> {
fn default() -> Self {
Self::new()
}
Expand All @@ -315,7 +317,9 @@ impl<T> ops::Deref for ObservableVector<T> {
}
}

impl<T: Clone + Send + Sync + 'static> From<Vector<T>> for ObservableVector<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> From<Vector<T>>
for ObservableVector<T>
{
fn from(values: Vector<T>) -> Self {
let mut this = Self::new();
this.append(values);
Expand Down
9 changes: 6 additions & 3 deletions eyeball-im/src/vector/entry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{fmt, ops::Deref};

use super::ObservableVector;
use super::{
traits::{SendOutsideWasm, SyncOutsideWasm},
ObservableVector,
};

/// A handle to a single value in an [`ObservableVector`].
pub struct ObservableVectorEntry<'a, T> {
Expand All @@ -10,7 +13,7 @@ pub struct ObservableVectorEntry<'a, T> {

impl<'a, T> ObservableVectorEntry<'a, T>
where
T: Clone + Send + Sync + 'static,
T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
{
pub(super) fn new(inner: &'a mut ObservableVector<T>, index: usize) -> Self {
Self { inner, index: EntryIndex::Owned(index) }
Expand Down Expand Up @@ -115,7 +118,7 @@ pub struct ObservableVectorEntries<'a, T> {

impl<'a, T> ObservableVectorEntries<'a, T>
where
T: Clone + Send + Sync + 'static,
T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
{
pub(super) fn new(inner: &'a mut ObservableVector<T>) -> Self {
Self { inner, index: 0 }
Expand Down
17 changes: 11 additions & 6 deletions eyeball-im/src/vector/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ use tokio::sync::broadcast::{
error::{RecvError, TryRecvError},
Receiver,
};
use tokio_util::sync::ReusableBoxFuture;

#[cfg(feature = "tracing")]
use tracing::info;

use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff};
use super::{
traits::{ReusableBoxFuture, SendOutsideWasm, SyncOutsideWasm},
BroadcastMessage, OneOrManyDiffs, VectorDiff,
};

/// A subscriber for updates of a [`Vector`].
#[derive(Debug)]
Expand All @@ -25,7 +28,7 @@ pub struct VectorSubscriber<T> {
rx: Receiver<BroadcastMessage<T>>,
}

impl<T: Clone + Send + Sync + 'static> VectorSubscriber<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> VectorSubscriber<T> {
pub(super) fn new(items: Vector<T>, rx: Receiver<BroadcastMessage<T>>) -> Self {
Self { values: items, rx }
}
Expand Down Expand Up @@ -100,7 +103,7 @@ enum VectorSubscriberStreamState<T> {
// Not clear why this explicit impl is needed, but it's not unsafe so it is fine
impl<T> Unpin for VectorSubscriberStreamState<T> {}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberStream<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> Stream for VectorSubscriberStream<T> {
type Item = VectorDiff<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -171,7 +174,9 @@ impl<T> VectorSubscriberBatchedStream<T> {
}
}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberBatchedStream<T> {
impl<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static> Stream
for VectorSubscriberBatchedStream<T>
{
type Item = Vec<VectorDiff<T>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -213,7 +218,7 @@ impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberBatchedStream<
}
}

fn handle_lag<T: Clone + Send + Sync + 'static>(
fn handle_lag<T: Clone + SendOutsideWasm + SyncOutsideWasm + 'static>(
rx: &mut Receiver<BroadcastMessage<T>>,
) -> Option<Vector<T>> {
let mut msg = None;
Expand Down
173 changes: 173 additions & 0 deletions eyeball-im/src/vector/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::alloc::Layout;
use std::future::{self, Future};
use std::mem::{self, ManuallyDrop};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, ptr};

#[cfg(not(target_arch = "wasm32"))]
pub use tokio_util::sync::ReusableBoxFuture;
struct CallOnDrop<O, F: FnOnce() -> O> {
f: ManuallyDrop<F>,
}

impl<O, F: FnOnce() -> O> CallOnDrop<O, F> {
fn new(f: F) -> Self {
let f = ManuallyDrop::new(f);
Self { f }
}
fn call(self) -> O {
let mut this = ManuallyDrop::new(self);
let f = unsafe { ManuallyDrop::take(&mut this.f) };
f()
}
}

impl<O, F: FnOnce() -> O> Drop for CallOnDrop<O, F> {
fn drop(&mut self) {
let f = unsafe { ManuallyDrop::take(&mut self.f) };
f();
}
}

fn reuse_pin_box<T: ?Sized, U, O, F>(boxed: Pin<Box<T>>, new_value: U, callback: F) -> Result<O, U>
where
F: FnOnce(Box<U>) -> O,
{
let layout = Layout::for_value::<T>(&*boxed);
if layout != Layout::new::<U>() {
return Err(new_value);
}

// SAFETY: We don't ever construct a non-pinned reference to the old `T` from now on, and we
// always drop the `T`.
let raw: *mut T = Box::into_raw(unsafe { Pin::into_inner_unchecked(boxed) });

// When dropping the old value panics, we still want to call `callback` — so move the rest of
// the code into a guard type.
let guard = CallOnDrop::new(|| {
let raw: *mut U = raw.cast::<U>();
unsafe { raw.write(new_value) };

// SAFETY:
// - `T` and `U` have the same layout.
// - `raw` comes from a `Box` that uses the same allocator as this one.
// - `raw` points to a valid instance of `U` (we just wrote it in).
let boxed = unsafe { Box::from_raw(raw) };

callback(boxed)
});

// Drop the old value.
unsafe { ptr::drop_in_place(raw) };

// Run the rest of the code.
Ok(guard.call())
}

/// A reusable `Pin<Box<dyn Future<Output = T> + Send + 'a>>`.
///
/// This type lets you replace the future stored in the box without
/// reallocating when the size and alignment permits this.
#[cfg(target_arch = "wasm32")]
pub(crate) struct ReusableBoxFuture<'a, T> {
boxed: Pin<Box<dyn Future<Output = T> + 'a>>,
}

#[cfg(target_arch = "wasm32")]
impl<'a, T> fmt::Debug for ReusableBoxFuture<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReusableBoxFuture").field("boxed", &"Future").finish()
}
}

#[cfg(target_arch = "wasm32")]
impl<'a, T> ReusableBoxFuture<'a, T> {
/// Create a new `ReusableBoxFuture<T>` containing the provided future.
pub(crate) fn new<F>(future: F) -> Self
where
F: Future<Output = T> + 'a,
{
Self { boxed: Box::pin(future) }
}

/// Replace the future currently stored in this box.
///
/// This reallocates if and only if the layout of the provided future is
/// different from the layout of the currently stored future.
pub(crate) fn set<F>(&mut self, future: F)
where
F: Future<Output = T> + SendOutsideWasm + 'a,
{
if let Err(future) = self.try_set(future) {
*self = Self::new(future);
}
}

/// Replace the future currently stored in this box.
///
/// This function never reallocates, but returns an error if the provided
/// future has a different size or alignment from the currently stored
/// future.
pub(crate) fn try_set<F>(&mut self, future: F) -> Result<(), F>
where
F: Future<Output = T> + SendOutsideWasm + 'a,
{
// If we try to inline the contents of this function, the type checker complains because
// the bound `T: 'a` is not satisfied in the call to `pending()`. But by putting it in an
// inner function that doesn't have `T` as a generic parameter, we implicitly get the bound
// `F::Output: 'a` transitively through `F: 'a`, allowing us to call `pending()`.
#[inline(always)]
fn real_try_set<'a, F>(
this: &mut ReusableBoxFuture<'a, F::Output>,
future: F,
) -> Result<(), F>
where
F: Future + SendOutsideWasm + 'a,
{
// future::Pending<T> is a ZST so this never allocates.
let boxed = mem::replace(&mut this.boxed, Box::pin(future::pending()));
reuse_pin_box(boxed, future, |boxed| this.boxed = Pin::from(boxed))
}

real_try_set(self, future)
}

/// Get a pinned reference to the underlying future.
pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future<Output = T>)> {
self.boxed.as_mut()
}

/// Poll the future stored inside this box.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<T> {
self.get_pin().poll(cx)
}
}

/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(not(target_arch = "wasm32"))]
pub trait SendOutsideWasm: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send> SendOutsideWasm for T {}

/// Alias for `Send` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(target_arch = "wasm32")]
pub trait SendOutsideWasm {}
#[cfg(target_arch = "wasm32")]
impl<T> SendOutsideWasm for T {}

/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(not(target_arch = "wasm32"))]
pub trait SyncOutsideWasm: Sync {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Sync> SyncOutsideWasm for T {}

/// Alias for `Sync` on non-wasm, empty trait (implemented by everything) on
/// wasm.
#[cfg(target_arch = "wasm32")]
pub trait SyncOutsideWasm {}
#[cfg(target_arch = "wasm32")]
impl<T> SyncOutsideWasm for T {}
Loading
Loading