diff --git a/core/common/src/collections.rs b/core/common/src/collections.rs index c3335d64..2ca2c749 100644 --- a/core/common/src/collections.rs +++ b/core/common/src/collections.rs @@ -16,6 +16,7 @@ pub use quadtree::*; pub use ringbuffer::*; pub use smallvec::{smallvec, SmallVec}; pub use spatialhash::*; +pub use swapvec::*; mod anymap; mod arena; @@ -27,6 +28,7 @@ mod priorityqueue; mod quadtree; mod ringbuffer; mod spatialhash; +mod swapvec; /// A faster hasher that is not resilient to DoS attacks. type FastHasher = BuildHasherDefault; diff --git a/core/common/src/collections/swapvec.rs b/core/common/src/collections/swapvec.rs new file mode 100644 index 00000000..fb18c2c5 --- /dev/null +++ b/core/common/src/collections/swapvec.rs @@ -0,0 +1,202 @@ +use std::ops::RangeFull; + +/// A red/green swap [`Vec`]. +/// +/// This is a specialized [`Vec`] that allows for efficient red/green swaps, +/// such that one [`Vec`] is active while the other is being written to. This is +/// useful for scheduling tasks or for active/free lists. +pub struct SwapVec { + red: Vec, + green: Vec, + status: Status, +} + +/// The status of a [`SwapVec`]. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Status { + Red, + Green, +} + +impl Default for SwapVec { + fn default() -> Self { + Self { + red: Vec::new(), + green: Vec::new(), + status: Status::Red, + } + } +} + +impl SwapVec { + /// Creates a new [`SwapVec`]. + pub fn new() -> Self { + Self::default() + } + + /// Creates a new [`SwapVec`] with the given capacity. + pub fn with_capacity(capacity: usize) -> Self { + Self { + red: Vec::with_capacity(capacity), + green: Vec::with_capacity(capacity), + status: Status::Red, + } + } + + /// Returns the length of the active [`Vec`]. + pub fn len(&self) -> usize { + match self.status { + Status::Red => self.red.len(), + Status::Green => self.green.len(), + } + } + + /// Returns the capacity of the active [`Vec`]. + pub fn capacity(&self) -> usize { + match self.status { + Status::Red => self.red.capacity(), + Status::Green => self.green.capacity(), + } + } + + /// Returns whether the active [`Vec`] is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Pushes a value into the active [`Vec`]. + pub fn push(&mut self, value: T) { + match self.status { + Status::Red => self.red.push(value), + Status::Green => self.green.push(value), + } + } + + /// Pops a value from the active [`Vec`]. + pub fn pop(&mut self) -> Option { + match self.status { + Status::Red => self.red.pop(), + Status::Green => self.green.pop(), + } + } + + /// Drains the active [`Vec`]. + pub fn drain(&mut self, range: RangeFull) -> std::vec::Drain { + match self.status { + Status::Red => self.red.drain(range), + Status::Green => self.green.drain(range), + } + } + + /// Swaps the active [`Vec` with the inactive one. + pub fn swap(&mut self) { + self.status = match self.status { + Status::Red => Status::Green, + Status::Green => Status::Red, + }; + } + + /// Clears the active [`Vec`]. + pub fn clear(&mut self) { + match self.status { + Status::Red => self.red.clear(), + Status::Green => self.green.clear(), + } + } + + /// Clears both [`Vec`]s. + pub fn clear_all(&mut self) { + self.red.clear(); + self.green.clear(); + } + + /// Iterates over the active [`Vec`]. + pub fn iter(&self) -> std::slice::Iter { + match self.status { + Status::Red => self.red.iter(), + Status::Green => self.green.iter(), + } + } + + /// Mutably iterates over the active [`Vec`]. + pub fn iter_mut(&mut self) -> std::slice::IterMut { + match self.status { + Status::Red => self.red.iter_mut(), + Status::Green => self.green.iter_mut(), + } + } + + /// Returns a slice of the active [`Vec`]. + pub fn as_slice(&self) -> &[T] { + match self.status { + Status::Red => self.red.as_slice(), + Status::Green => self.green.as_slice(), + } + } + + /// Returns a mutable slice of the active [`Vec`]. + pub fn as_mut_slice(&mut self) -> &mut [T] { + match self.status { + Status::Red => self.red.as_mut_slice(), + Status::Green => self.green.as_mut_slice(), + } + } +} + +impl AsRef<[T]> for SwapVec { + #[inline] + fn as_ref(&self) -> &[T] { + self.as_slice() + } +} + +impl AsMut<[T]> for SwapVec { + #[inline] + fn as_mut(&mut self) -> &mut [T] { + self.as_mut_slice() + } +} + +impl<'a, T> IntoIterator for &'a SwapVec { + type Item = &'a T; + type IntoIter = std::slice::Iter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, T> IntoIterator for &'a mut SwapVec { + type Item = &'a mut T; + type IntoIter = std::slice::IterMut<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic_swap_vec_operation() { + let mut swap = SwapVec::new(); + + swap.push(1); + swap.push(2); + swap.push(3); + + assert_eq!(swap.len(), 3); + + swap.swap(); + + assert_eq!(swap.len(), 0); + + swap.push(4); + swap.push(5); + swap.push(6); + + assert_eq!(swap.len(), 3); + } +} diff --git a/core/common/src/concurrency.rs b/core/common/src/concurrency.rs index 66e8a3eb..716a550c 100644 --- a/core/common/src/concurrency.rs +++ b/core/common/src/concurrency.rs @@ -1,5 +1,9 @@ //! Building blocks for working with concurrent code. +pub use fibers::*; pub use futures::*; +pub use tasks::*; +mod fibers; mod futures; +mod tasks; diff --git a/core/common/src/concurrency/fibers.rs b/core/common/src/concurrency/fibers.rs new file mode 100644 index 00000000..4f865c61 --- /dev/null +++ b/core/common/src/concurrency/fibers.rs @@ -0,0 +1,166 @@ +//! A very lightweight fiber runtime for Rust. +//! +//! Fibers are a form of lightweight cooperative multitasking that can be used +//! to write asynchronous code in a synchronous style. Fibers are similar to +//! threads, but they are scheduled cooperatively rather than preemptively. +//! +//! To spawn a fiber, you can use the [`FiberTask::spawn`] function with a +//! future that returns a value. The fiber will run concurrently with other +//! fibers and will need to be manually stepped by calling +//! [`FiberTask::resume`] until it is completed. + +use std::{future::Future, pin::Pin, task::Poll}; + +use super::TryPoll; + +/// Starts a new fiber task. +#[inline(always)] +pub fn fiber(future: F) -> FiberTask { + FiberTask::spawn(future) +} + +/// Yields execution until the next frame. +pub fn next_frame() -> impl Future { + /// A no-op instruction for a fiber that yield's it's value. + struct FiberYield { + done: bool, + } + + impl Future for FiberYield { + type Output = (); + + #[inline(always)] + fn poll(mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll { + if self.done { + Poll::Ready(()) + } else { + self.done = true; + Poll::Pending + } + } + } + + FiberYield { done: false } +} + +/// A fiber task that can be executed cooperatively. +pub struct FiberTask { + status: FiberStatus, + future: Pin>>, +} + +/// The internal status of a [`FiberTask`]. +enum FiberStatus { + Pending, + Completed(T), + Finalized, +} + +impl FiberStatus { + /// Attempts to take the value from the status. + fn take(&mut self) -> Option { + match std::mem::replace(self, FiberStatus::Finalized) { + FiberStatus::Pending => None, + FiberStatus::Completed(value) => Some(value), + FiberStatus::Finalized => None, + } + } +} + +impl FiberTask { + /// Spawns a new fiber task from a future. + pub fn spawn + 'static>(future: F) -> Self { + Self { + status: FiberStatus::Pending, + future: Box::pin(future), + } + } + + /// Resumes control to the fiber. If completed, returns the value. + pub fn resume(&mut self) -> Option { + match self.status { + FiberStatus::Pending => match self.future.as_mut().try_poll() { + Poll::Ready(value) => { + self.status = FiberStatus::Completed(value); + self.status.take() + } + Poll::Pending => None, + }, + FiberStatus::Completed(_) => self.status.take(), + FiberStatus::Finalized => None, + } + } + + /// Resumes the fiber to completion. + pub fn complete(&mut self) -> T { + if matches!(self.status, FiberStatus::Finalized) { + panic!("Fiber has already been finalized"); + } + + loop { + if let Some(value) = self.resume() { + return value; + } + } + } +} + +/// Allows a [`FiberTask`] to be polled. +impl Future for FiberTask { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll { + if let Some(value) = self.resume() { + Poll::Ready(value) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::BlockableFuture; + + #[test] + fn test_basic_fiber_usage() { + let mut fiber = fiber(async { 42 }); + let result = fiber.complete(); + + assert_eq!(result, 42); + } + + #[test] + #[should_panic] + fn test_fiber_panic_after_completion() { + let mut fiber = fiber(async { 42 }); + let _ = fiber.complete(); + + fiber.complete(); + } + + #[test] + fn test_fiber_future() { + let fiber = fiber(async { 42 }); + let result = fiber.block(); + + assert_eq!(result, 42); + } + + #[test] + fn test_fiber_yield() { + let mut fiber = fiber(async { + next_frame().await; + next_frame().await; + next_frame().await; + + 42 + }); + + assert_eq!(fiber.resume(), None); + assert_eq!(fiber.resume(), None); + assert_eq!(fiber.resume(), None); + assert_eq!(fiber.resume(), Some(42)); + } +} diff --git a/core/common/src/concurrency/futures.rs b/core/common/src/concurrency/futures.rs index 711065cd..1ae4f734 100644 --- a/core/common/src/concurrency/futures.rs +++ b/core/common/src/concurrency/futures.rs @@ -1,4 +1,4 @@ -use std::future::Future; +use std::{future::Future, pin::Pin, task::Poll}; /// Blocks the current thread until the given future completes. pub fn block(body: impl FnOnce() -> F) -> F::Output { @@ -6,27 +6,47 @@ pub fn block(body: impl FnOnce() -> F) -> F::Output { } /// Allows a [`Future`] to be blocked on. -pub trait BlockableFuture: Future { +pub trait BlockableFuture { + type Output; + /// Blocks the current thread until the future completes. fn block(self) -> Self::Output; } impl BlockableFuture for F { - fn block(self) -> Self::Output { - let mut boxed = Box::pin(self); - let waker = std::task::Waker::noop(); - let mut context = std::task::Context::from_waker(waker); + type Output = F::Output; + fn block(self) -> Self::Output { + let mut future = Box::pin(self); loop { - if let std::task::Poll::Ready(value) = boxed.as_mut().poll(&mut context) { - return value; + match future.as_mut().try_poll() { + Poll::Ready(value) => return value, + Poll::Pending => std::thread::yield_now(), } - - std::thread::yield_now(); } } } +/// Allows polling for a future without scheduling a wakeup. +pub trait TryPoll { + type Output; + + /// Attempts to resolve the future to a final value. + fn try_poll(self: Pin<&mut Self>) -> Poll; +} + +/// Allows a [`Future`] to attempted to be polled. +impl TryPoll for F { + type Output = F::Output; + + fn try_poll(mut self: Pin<&mut Self>) -> Poll { + let waker = std::task::Waker::noop(); + let mut context = std::task::Context::from_waker(&waker); + + self.as_mut().poll(&mut context) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/common/src/concurrency/tasks.rs b/core/common/src/concurrency/tasks.rs new file mode 100644 index 00000000..a02505b8 --- /dev/null +++ b/core/common/src/concurrency/tasks.rs @@ -0,0 +1,70 @@ +//! A lightweight task runtime for Rust. + +use std::{ + future::Future, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; + +/// A continuation that can be executed. +type Continuation = dyn FnOnce() -> (); + +/// A scheduler for tasks. +#[derive(Default)] +pub struct TaskScheduler { + continuations: Mutex>>, +} + +/// A task that can be executed concurrently. +pub struct Task { + status: TaskStatus, +} + +/// The status of a task. +enum TaskStatus { + Pending, + Completed(T), +} + +impl Task { + /// Creates a new task from a result. + pub fn from_result(result: T) -> Self { + Self { + status: TaskStatus::Completed(result), + } + } +} + +impl TaskScheduler { + /// Returns the current task scheduler. + pub fn current() -> &'static Self { + todo!() + } + + /// Schedules a [`Continuation`] to be executed. + pub fn schedule(continuation: Box) { + let scheduler = Self::current(); + let mut continuations = scheduler.continuations.lock().unwrap(); + + continuations.push(continuation); + } + + /// Processes all [`Continuation`]s in the task scheduler. + pub fn process() { + let scheduler = Self::current(); + let mut continuations = scheduler.continuations.lock().unwrap(); + + for continuation in continuations.drain(..) { + continuation(); + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, _context: &mut Context<'_>) -> Poll { + todo!() + } +} diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 907aae36..fbe018d3 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -16,6 +16,7 @@ #![feature(coerce_unsized)] #![feature(unsize)] #![feature(downcast_unchecked)] +#![feature(async_closure)] pub use abstractions::*; pub use collections::*;