Skip to content

Commit

Permalink
Add lightweight fiber runtime.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattkleiny committed Aug 6, 2024
1 parent 042c70d commit 50d75fa
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 10 deletions.
2 changes: 2 additions & 0 deletions core/common/src/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use quadtree::*;
pub use ringbuffer::*;
pub use smallvec::{smallvec, SmallVec};
pub use spatialhash::*;
pub use swapvec::*;

mod anymap;
mod arena;
Expand All @@ -27,6 +28,7 @@ mod priorityqueue;
mod quadtree;
mod ringbuffer;
mod spatialhash;
mod swapvec;

/// A faster hasher that is not resilient to DoS attacks.
type FastHasher = BuildHasherDefault<rustc_hash::FxHasher>;
Expand Down
202 changes: 202 additions & 0 deletions core/common/src/collections/swapvec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use std::ops::RangeFull;

/// A red/green swap [`Vec`].
///
/// This is a specialized [`Vec`] that allows for efficient red/green swaps,
/// such that one [`Vec`] is active while the other is being written to. This is
/// useful for scheduling tasks or for active/free lists.
pub struct SwapVec<T> {
red: Vec<T>,
green: Vec<T>,
status: Status,
}

/// The status of a [`SwapVec`].
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Status {
Red,
Green,
}

impl<T> Default for SwapVec<T> {
fn default() -> Self {
Self {
red: Vec::new(),
green: Vec::new(),
status: Status::Red,
}
}
}

impl<T> SwapVec<T> {
/// Creates a new [`SwapVec`].
pub fn new() -> Self {
Self::default()
}

/// Creates a new [`SwapVec`] with the given capacity.
pub fn with_capacity(capacity: usize) -> Self {
Self {
red: Vec::with_capacity(capacity),
green: Vec::with_capacity(capacity),
status: Status::Red,
}
}

/// Returns the length of the active [`Vec`].
pub fn len(&self) -> usize {
match self.status {
Status::Red => self.red.len(),
Status::Green => self.green.len(),
}
}

/// Returns the capacity of the active [`Vec`].
pub fn capacity(&self) -> usize {
match self.status {
Status::Red => self.red.capacity(),
Status::Green => self.green.capacity(),
}
}

/// Returns whether the active [`Vec`] is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Pushes a value into the active [`Vec`].
pub fn push(&mut self, value: T) {
match self.status {
Status::Red => self.red.push(value),
Status::Green => self.green.push(value),
}
}

/// Pops a value from the active [`Vec`].
pub fn pop(&mut self) -> Option<T> {
match self.status {
Status::Red => self.red.pop(),
Status::Green => self.green.pop(),
}
}

/// Drains the active [`Vec`].
pub fn drain(&mut self, range: RangeFull) -> std::vec::Drain<T> {
match self.status {
Status::Red => self.red.drain(range),
Status::Green => self.green.drain(range),
}
}

/// Swaps the active [`Vec` with the inactive one.
pub fn swap(&mut self) {
self.status = match self.status {
Status::Red => Status::Green,
Status::Green => Status::Red,
};
}

/// Clears the active [`Vec`].
pub fn clear(&mut self) {
match self.status {
Status::Red => self.red.clear(),
Status::Green => self.green.clear(),
}
}

/// Clears both [`Vec`]s.
pub fn clear_all(&mut self) {
self.red.clear();
self.green.clear();
}

/// Iterates over the active [`Vec`].
pub fn iter(&self) -> std::slice::Iter<T> {
match self.status {
Status::Red => self.red.iter(),
Status::Green => self.green.iter(),
}
}

/// Mutably iterates over the active [`Vec`].
pub fn iter_mut(&mut self) -> std::slice::IterMut<T> {
match self.status {
Status::Red => self.red.iter_mut(),
Status::Green => self.green.iter_mut(),
}
}

/// Returns a slice of the active [`Vec`].
pub fn as_slice(&self) -> &[T] {
match self.status {
Status::Red => self.red.as_slice(),
Status::Green => self.green.as_slice(),
}
}

/// Returns a mutable slice of the active [`Vec`].
pub fn as_mut_slice(&mut self) -> &mut [T] {
match self.status {
Status::Red => self.red.as_mut_slice(),
Status::Green => self.green.as_mut_slice(),
}
}
}

impl<T> AsRef<[T]> for SwapVec<T> {
#[inline]
fn as_ref(&self) -> &[T] {
self.as_slice()
}
}

impl<T> AsMut<[T]> for SwapVec<T> {
#[inline]
fn as_mut(&mut self) -> &mut [T] {
self.as_mut_slice()
}
}

impl<'a, T> IntoIterator for &'a SwapVec<T> {
type Item = &'a T;
type IntoIter = std::slice::Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a, T> IntoIterator for &'a mut SwapVec<T> {
type Item = &'a mut T;
type IntoIter = std::slice::IterMut<'a, T>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_basic_swap_vec_operation() {
let mut swap = SwapVec::new();

swap.push(1);
swap.push(2);
swap.push(3);

assert_eq!(swap.len(), 3);

swap.swap();

assert_eq!(swap.len(), 0);

swap.push(4);
swap.push(5);
swap.push(6);

assert_eq!(swap.len(), 3);
}
}
4 changes: 4 additions & 0 deletions core/common/src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! Building blocks for working with concurrent code.
pub use fibers::*;
pub use futures::*;
pub use tasks::*;

mod fibers;
mod futures;
mod tasks;
166 changes: 166 additions & 0 deletions core/common/src/concurrency/fibers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//! A very lightweight fiber runtime for Rust.
//!
//! Fibers are a form of lightweight cooperative multitasking that can be used
//! to write asynchronous code in a synchronous style. Fibers are similar to
//! threads, but they are scheduled cooperatively rather than preemptively.
//!
//! To spawn a fiber, you can use the [`FiberTask::spawn`] function with a
//! future that returns a value. The fiber will run concurrently with other
//! fibers and will need to be manually stepped by calling
//! [`FiberTask::resume`] until it is completed.
use std::{future::Future, pin::Pin, task::Poll};

use super::TryPoll;

/// Starts a new fiber task.
#[inline(always)]
pub fn fiber<F: Future + 'static>(future: F) -> FiberTask<F::Output> {
FiberTask::spawn(future)
}

/// Yields execution until the next frame.
pub fn next_frame() -> impl Future<Output = ()> {
/// A no-op instruction for a fiber that yield's it's value.
struct FiberYield {
done: bool,
}

impl Future for FiberYield {
type Output = ();

#[inline(always)]
fn poll(mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.done {
Poll::Ready(())
} else {
self.done = true;
Poll::Pending
}
}
}

FiberYield { done: false }
}

/// A fiber task that can be executed cooperatively.
pub struct FiberTask<T> {
status: FiberStatus<T>,
future: Pin<Box<dyn Future<Output = T>>>,
}

/// The internal status of a [`FiberTask`].
enum FiberStatus<T> {
Pending,
Completed(T),
Finalized,
}

impl<T> FiberStatus<T> {
/// Attempts to take the value from the status.
fn take(&mut self) -> Option<T> {
match std::mem::replace(self, FiberStatus::Finalized) {
FiberStatus::Pending => None,
FiberStatus::Completed(value) => Some(value),
FiberStatus::Finalized => None,
}
}
}

impl<T> FiberTask<T> {
/// Spawns a new fiber task from a future.
pub fn spawn<F: Future<Output = T> + 'static>(future: F) -> Self {
Self {
status: FiberStatus::Pending,
future: Box::pin(future),
}
}

/// Resumes control to the fiber. If completed, returns the value.
pub fn resume(&mut self) -> Option<T> {
match self.status {
FiberStatus::Pending => match self.future.as_mut().try_poll() {
Poll::Ready(value) => {
self.status = FiberStatus::Completed(value);
self.status.take()
}
Poll::Pending => None,
},
FiberStatus::Completed(_) => self.status.take(),
FiberStatus::Finalized => None,
}
}

/// Resumes the fiber to completion.
pub fn complete(&mut self) -> T {
if matches!(self.status, FiberStatus::Finalized) {
panic!("Fiber has already been finalized");
}

loop {
if let Some(value) = self.resume() {
return value;
}
}
}
}

/// Allows a [`FiberTask`] to be polled.
impl<T: Unpin> Future for FiberTask<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if let Some(value) = self.resume() {
Poll::Ready(value)
} else {
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::BlockableFuture;

#[test]
fn test_basic_fiber_usage() {
let mut fiber = fiber(async { 42 });
let result = fiber.complete();

assert_eq!(result, 42);
}

#[test]
#[should_panic]
fn test_fiber_panic_after_completion() {
let mut fiber = fiber(async { 42 });
let _ = fiber.complete();

fiber.complete();
}

#[test]
fn test_fiber_future() {
let fiber = fiber(async { 42 });
let result = fiber.block();

assert_eq!(result, 42);
}

#[test]
fn test_fiber_yield() {
let mut fiber = fiber(async {
next_frame().await;
next_frame().await;
next_frame().await;

42
});

assert_eq!(fiber.resume(), None);
assert_eq!(fiber.resume(), None);
assert_eq!(fiber.resume(), None);
assert_eq!(fiber.resume(), Some(42));
}
}
Loading

0 comments on commit 50d75fa

Please sign in to comment.